##// END OF EJS Templates
Use DottedObjectName traits in zmq and parallel modules.
Thomas Kluyver -
Show More
@@ -1,446 +1,447 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 The ipcluster application.
4 The ipcluster application.
5
5
6 Authors:
6 Authors:
7
7
8 * Brian Granger
8 * Brian Granger
9 * MinRK
9 * MinRK
10
10
11 """
11 """
12
12
13 #-----------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14 # Copyright (C) 2008-2011 The IPython Development Team
14 # Copyright (C) 2008-2011 The IPython Development Team
15 #
15 #
16 # Distributed under the terms of the BSD License. The full license is in
16 # Distributed under the terms of the BSD License. The full license is in
17 # the file COPYING, distributed as part of this software.
17 # the file COPYING, distributed as part of this software.
18 #-----------------------------------------------------------------------------
18 #-----------------------------------------------------------------------------
19
19
20 #-----------------------------------------------------------------------------
20 #-----------------------------------------------------------------------------
21 # Imports
21 # Imports
22 #-----------------------------------------------------------------------------
22 #-----------------------------------------------------------------------------
23
23
24 import errno
24 import errno
25 import logging
25 import logging
26 import os
26 import os
27 import re
27 import re
28 import signal
28 import signal
29
29
30 from subprocess import check_call, CalledProcessError, PIPE
30 from subprocess import check_call, CalledProcessError, PIPE
31 import zmq
31 import zmq
32 from zmq.eventloop import ioloop
32 from zmq.eventloop import ioloop
33
33
34 from IPython.config.application import Application, boolean_flag
34 from IPython.config.application import Application, boolean_flag
35 from IPython.config.loader import Config
35 from IPython.config.loader import Config
36 from IPython.core.application import BaseIPythonApplication
36 from IPython.core.application import BaseIPythonApplication
37 from IPython.core.profiledir import ProfileDir
37 from IPython.core.profiledir import ProfileDir
38 from IPython.utils.daemonize import daemonize
38 from IPython.utils.daemonize import daemonize
39 from IPython.utils.importstring import import_item
39 from IPython.utils.importstring import import_item
40 from IPython.utils.traitlets import Int, Unicode, Bool, CFloat, Dict, List
40 from IPython.utils.traitlets import (Int, Unicode, Bool, CFloat, Dict, List,
41 DottedObjectName)
41
42
42 from IPython.parallel.apps.baseapp import (
43 from IPython.parallel.apps.baseapp import (
43 BaseParallelApplication,
44 BaseParallelApplication,
44 PIDFileError,
45 PIDFileError,
45 base_flags, base_aliases
46 base_flags, base_aliases
46 )
47 )
47
48
48
49
49 #-----------------------------------------------------------------------------
50 #-----------------------------------------------------------------------------
50 # Module level variables
51 # Module level variables
51 #-----------------------------------------------------------------------------
52 #-----------------------------------------------------------------------------
52
53
53
54
54 default_config_file_name = u'ipcluster_config.py'
55 default_config_file_name = u'ipcluster_config.py'
55
56
56
57
57 _description = """Start an IPython cluster for parallel computing.
58 _description = """Start an IPython cluster for parallel computing.
58
59
59 An IPython cluster consists of 1 controller and 1 or more engines.
60 An IPython cluster consists of 1 controller and 1 or more engines.
60 This command automates the startup of these processes using a wide
61 This command automates the startup of these processes using a wide
61 range of startup methods (SSH, local processes, PBS, mpiexec,
62 range of startup methods (SSH, local processes, PBS, mpiexec,
62 Windows HPC Server 2008). To start a cluster with 4 engines on your
63 Windows HPC Server 2008). To start a cluster with 4 engines on your
63 local host simply do 'ipcluster start n=4'. For more complex usage
64 local host simply do 'ipcluster start n=4'. For more complex usage
64 you will typically do 'ipcluster create profile=mycluster', then edit
65 you will typically do 'ipcluster create profile=mycluster', then edit
65 configuration files, followed by 'ipcluster start profile=mycluster n=4'.
66 configuration files, followed by 'ipcluster start profile=mycluster n=4'.
66 """
67 """
67
68
68
69
69 # Exit codes for ipcluster
70 # Exit codes for ipcluster
70
71
71 # This will be the exit code if the ipcluster appears to be running because
72 # This will be the exit code if the ipcluster appears to be running because
72 # a .pid file exists
73 # a .pid file exists
73 ALREADY_STARTED = 10
74 ALREADY_STARTED = 10
74
75
75
76
76 # This will be the exit code if ipcluster stop is run, but there is not .pid
77 # This will be the exit code if ipcluster stop is run, but there is not .pid
77 # file to be found.
78 # file to be found.
78 ALREADY_STOPPED = 11
79 ALREADY_STOPPED = 11
79
80
80 # This will be the exit code if ipcluster engines is run, but there is not .pid
81 # This will be the exit code if ipcluster engines is run, but there is not .pid
81 # file to be found.
82 # file to be found.
82 NO_CLUSTER = 12
83 NO_CLUSTER = 12
83
84
84
85
85 #-----------------------------------------------------------------------------
86 #-----------------------------------------------------------------------------
86 # Main application
87 # Main application
87 #-----------------------------------------------------------------------------
88 #-----------------------------------------------------------------------------
88 start_help = """Start an IPython cluster for parallel computing
89 start_help = """Start an IPython cluster for parallel computing
89
90
90 Start an ipython cluster by its profile name or cluster
91 Start an ipython cluster by its profile name or cluster
91 directory. Cluster directories contain configuration, log and
92 directory. Cluster directories contain configuration, log and
92 security related files and are named using the convention
93 security related files and are named using the convention
93 'profile_<name>' and should be creating using the 'start'
94 'profile_<name>' and should be creating using the 'start'
94 subcommand of 'ipcluster'. If your cluster directory is in
95 subcommand of 'ipcluster'. If your cluster directory is in
95 the cwd or the ipython directory, you can simply refer to it
96 the cwd or the ipython directory, you can simply refer to it
96 using its profile name, 'ipcluster start n=4 profile=<profile>`,
97 using its profile name, 'ipcluster start n=4 profile=<profile>`,
97 otherwise use the 'profile_dir' option.
98 otherwise use the 'profile_dir' option.
98 """
99 """
99 stop_help = """Stop a running IPython cluster
100 stop_help = """Stop a running IPython cluster
100
101
101 Stop a running ipython cluster by its profile name or cluster
102 Stop a running ipython cluster by its profile name or cluster
102 directory. Cluster directories are named using the convention
103 directory. Cluster directories are named using the convention
103 'profile_<name>'. If your cluster directory is in
104 'profile_<name>'. If your cluster directory is in
104 the cwd or the ipython directory, you can simply refer to it
105 the cwd or the ipython directory, you can simply refer to it
105 using its profile name, 'ipcluster stop profile=<profile>`, otherwise
106 using its profile name, 'ipcluster stop profile=<profile>`, otherwise
106 use the 'profile_dir' option.
107 use the 'profile_dir' option.
107 """
108 """
108 engines_help = """Start engines connected to an existing IPython cluster
109 engines_help = """Start engines connected to an existing IPython cluster
109
110
110 Start one or more engines to connect to an existing Cluster
111 Start one or more engines to connect to an existing Cluster
111 by profile name or cluster directory.
112 by profile name or cluster directory.
112 Cluster directories contain configuration, log and
113 Cluster directories contain configuration, log and
113 security related files and are named using the convention
114 security related files and are named using the convention
114 'profile_<name>' and should be creating using the 'start'
115 'profile_<name>' and should be creating using the 'start'
115 subcommand of 'ipcluster'. If your cluster directory is in
116 subcommand of 'ipcluster'. If your cluster directory is in
116 the cwd or the ipython directory, you can simply refer to it
117 the cwd or the ipython directory, you can simply refer to it
117 using its profile name, 'ipcluster engines n=4 profile=<profile>`,
118 using its profile name, 'ipcluster engines n=4 profile=<profile>`,
118 otherwise use the 'profile_dir' option.
119 otherwise use the 'profile_dir' option.
119 """
120 """
120 stop_aliases = dict(
121 stop_aliases = dict(
121 signal='IPClusterStop.signal',
122 signal='IPClusterStop.signal',
122 profile='BaseIPythonApplication.profile',
123 profile='BaseIPythonApplication.profile',
123 profile_dir='ProfileDir.location',
124 profile_dir='ProfileDir.location',
124 )
125 )
125
126
126 class IPClusterStop(BaseParallelApplication):
127 class IPClusterStop(BaseParallelApplication):
127 name = u'ipcluster'
128 name = u'ipcluster'
128 description = stop_help
129 description = stop_help
129 config_file_name = Unicode(default_config_file_name)
130 config_file_name = Unicode(default_config_file_name)
130
131
131 signal = Int(signal.SIGINT, config=True,
132 signal = Int(signal.SIGINT, config=True,
132 help="signal to use for stopping processes.")
133 help="signal to use for stopping processes.")
133
134
134 aliases = Dict(stop_aliases)
135 aliases = Dict(stop_aliases)
135
136
136 def start(self):
137 def start(self):
137 """Start the app for the stop subcommand."""
138 """Start the app for the stop subcommand."""
138 try:
139 try:
139 pid = self.get_pid_from_file()
140 pid = self.get_pid_from_file()
140 except PIDFileError:
141 except PIDFileError:
141 self.log.critical(
142 self.log.critical(
142 'Could not read pid file, cluster is probably not running.'
143 'Could not read pid file, cluster is probably not running.'
143 )
144 )
144 # Here I exit with a unusual exit status that other processes
145 # Here I exit with a unusual exit status that other processes
145 # can watch for to learn how I existed.
146 # can watch for to learn how I existed.
146 self.remove_pid_file()
147 self.remove_pid_file()
147 self.exit(ALREADY_STOPPED)
148 self.exit(ALREADY_STOPPED)
148
149
149 if not self.check_pid(pid):
150 if not self.check_pid(pid):
150 self.log.critical(
151 self.log.critical(
151 'Cluster [pid=%r] is not running.' % pid
152 'Cluster [pid=%r] is not running.' % pid
152 )
153 )
153 self.remove_pid_file()
154 self.remove_pid_file()
154 # Here I exit with a unusual exit status that other processes
155 # Here I exit with a unusual exit status that other processes
155 # can watch for to learn how I existed.
156 # can watch for to learn how I existed.
156 self.exit(ALREADY_STOPPED)
157 self.exit(ALREADY_STOPPED)
157
158
158 elif os.name=='posix':
159 elif os.name=='posix':
159 sig = self.signal
160 sig = self.signal
160 self.log.info(
161 self.log.info(
161 "Stopping cluster [pid=%r] with [signal=%r]" % (pid, sig)
162 "Stopping cluster [pid=%r] with [signal=%r]" % (pid, sig)
162 )
163 )
163 try:
164 try:
164 os.kill(pid, sig)
165 os.kill(pid, sig)
165 except OSError:
166 except OSError:
166 self.log.error("Stopping cluster failed, assuming already dead.",
167 self.log.error("Stopping cluster failed, assuming already dead.",
167 exc_info=True)
168 exc_info=True)
168 self.remove_pid_file()
169 self.remove_pid_file()
169 elif os.name=='nt':
170 elif os.name=='nt':
170 try:
171 try:
171 # kill the whole tree
172 # kill the whole tree
172 p = check_call(['taskkill', '-pid', str(pid), '-t', '-f'], stdout=PIPE,stderr=PIPE)
173 p = check_call(['taskkill', '-pid', str(pid), '-t', '-f'], stdout=PIPE,stderr=PIPE)
173 except (CalledProcessError, OSError):
174 except (CalledProcessError, OSError):
174 self.log.error("Stopping cluster failed, assuming already dead.",
175 self.log.error("Stopping cluster failed, assuming already dead.",
175 exc_info=True)
176 exc_info=True)
176 self.remove_pid_file()
177 self.remove_pid_file()
177
178
178 engine_aliases = {}
179 engine_aliases = {}
179 engine_aliases.update(base_aliases)
180 engine_aliases.update(base_aliases)
180 engine_aliases.update(dict(
181 engine_aliases.update(dict(
181 n='IPClusterEngines.n',
182 n='IPClusterEngines.n',
182 elauncher = 'IPClusterEngines.engine_launcher_class',
183 elauncher = 'IPClusterEngines.engine_launcher_class',
183 ))
184 ))
184 class IPClusterEngines(BaseParallelApplication):
185 class IPClusterEngines(BaseParallelApplication):
185
186
186 name = u'ipcluster'
187 name = u'ipcluster'
187 description = engines_help
188 description = engines_help
188 usage = None
189 usage = None
189 config_file_name = Unicode(default_config_file_name)
190 config_file_name = Unicode(default_config_file_name)
190 default_log_level = logging.INFO
191 default_log_level = logging.INFO
191 classes = List()
192 classes = List()
192 def _classes_default(self):
193 def _classes_default(self):
193 from IPython.parallel.apps import launcher
194 from IPython.parallel.apps import launcher
194 launchers = launcher.all_launchers
195 launchers = launcher.all_launchers
195 eslaunchers = [ l for l in launchers if 'EngineSet' in l.__name__]
196 eslaunchers = [ l for l in launchers if 'EngineSet' in l.__name__]
196 return [ProfileDir]+eslaunchers
197 return [ProfileDir]+eslaunchers
197
198
198 n = Int(2, config=True,
199 n = Int(2, config=True,
199 help="The number of engines to start.")
200 help="The number of engines to start.")
200
201
201 engine_launcher_class = Unicode('LocalEngineSetLauncher',
202 engine_launcher_class = DottedObjectName('LocalEngineSetLauncher',
202 config=True,
203 config=True,
203 help="The class for launching a set of Engines."
204 help="The class for launching a set of Engines."
204 )
205 )
205 daemonize = Bool(False, config=True,
206 daemonize = Bool(False, config=True,
206 help='Daemonize the ipcluster program. This implies --log-to-file')
207 help='Daemonize the ipcluster program. This implies --log-to-file')
207
208
208 def _daemonize_changed(self, name, old, new):
209 def _daemonize_changed(self, name, old, new):
209 if new:
210 if new:
210 self.log_to_file = True
211 self.log_to_file = True
211
212
212 aliases = Dict(engine_aliases)
213 aliases = Dict(engine_aliases)
213 # flags = Dict(flags)
214 # flags = Dict(flags)
214 _stopping = False
215 _stopping = False
215
216
216 def initialize(self, argv=None):
217 def initialize(self, argv=None):
217 super(IPClusterEngines, self).initialize(argv)
218 super(IPClusterEngines, self).initialize(argv)
218 self.init_signal()
219 self.init_signal()
219 self.init_launchers()
220 self.init_launchers()
220
221
221 def init_launchers(self):
222 def init_launchers(self):
222 self.engine_launcher = self.build_launcher(self.engine_launcher_class)
223 self.engine_launcher = self.build_launcher(self.engine_launcher_class)
223 self.engine_launcher.on_stop(lambda r: self.loop.stop())
224 self.engine_launcher.on_stop(lambda r: self.loop.stop())
224
225
225 def init_signal(self):
226 def init_signal(self):
226 # Setup signals
227 # Setup signals
227 signal.signal(signal.SIGINT, self.sigint_handler)
228 signal.signal(signal.SIGINT, self.sigint_handler)
228
229
229 def build_launcher(self, clsname):
230 def build_launcher(self, clsname):
230 """import and instantiate a Launcher based on importstring"""
231 """import and instantiate a Launcher based on importstring"""
231 if '.' not in clsname:
232 if '.' not in clsname:
232 # not a module, presume it's the raw name in apps.launcher
233 # not a module, presume it's the raw name in apps.launcher
233 clsname = 'IPython.parallel.apps.launcher.'+clsname
234 clsname = 'IPython.parallel.apps.launcher.'+clsname
234 # print repr(clsname)
235 # print repr(clsname)
235 klass = import_item(clsname)
236 klass = import_item(clsname)
236
237
237 launcher = klass(
238 launcher = klass(
238 work_dir=self.profile_dir.location, config=self.config, log=self.log
239 work_dir=self.profile_dir.location, config=self.config, log=self.log
239 )
240 )
240 return launcher
241 return launcher
241
242
242 def start_engines(self):
243 def start_engines(self):
243 self.log.info("Starting %i engines"%self.n)
244 self.log.info("Starting %i engines"%self.n)
244 self.engine_launcher.start(
245 self.engine_launcher.start(
245 self.n,
246 self.n,
246 self.profile_dir.location
247 self.profile_dir.location
247 )
248 )
248
249
249 def stop_engines(self):
250 def stop_engines(self):
250 self.log.info("Stopping Engines...")
251 self.log.info("Stopping Engines...")
251 if self.engine_launcher.running:
252 if self.engine_launcher.running:
252 d = self.engine_launcher.stop()
253 d = self.engine_launcher.stop()
253 return d
254 return d
254 else:
255 else:
255 return None
256 return None
256
257
257 def stop_launchers(self, r=None):
258 def stop_launchers(self, r=None):
258 if not self._stopping:
259 if not self._stopping:
259 self._stopping = True
260 self._stopping = True
260 self.log.error("IPython cluster: stopping")
261 self.log.error("IPython cluster: stopping")
261 self.stop_engines()
262 self.stop_engines()
262 # Wait a few seconds to let things shut down.
263 # Wait a few seconds to let things shut down.
263 dc = ioloop.DelayedCallback(self.loop.stop, 4000, self.loop)
264 dc = ioloop.DelayedCallback(self.loop.stop, 4000, self.loop)
264 dc.start()
265 dc.start()
265
266
266 def sigint_handler(self, signum, frame):
267 def sigint_handler(self, signum, frame):
267 self.log.debug("SIGINT received, stopping launchers...")
268 self.log.debug("SIGINT received, stopping launchers...")
268 self.stop_launchers()
269 self.stop_launchers()
269
270
270 def start_logging(self):
271 def start_logging(self):
271 # Remove old log files of the controller and engine
272 # Remove old log files of the controller and engine
272 if self.clean_logs:
273 if self.clean_logs:
273 log_dir = self.profile_dir.log_dir
274 log_dir = self.profile_dir.log_dir
274 for f in os.listdir(log_dir):
275 for f in os.listdir(log_dir):
275 if re.match(r'ip(engine|controller)z-\d+\.(log|err|out)',f):
276 if re.match(r'ip(engine|controller)z-\d+\.(log|err|out)',f):
276 os.remove(os.path.join(log_dir, f))
277 os.remove(os.path.join(log_dir, f))
277 # This will remove old log files for ipcluster itself
278 # This will remove old log files for ipcluster itself
278 # super(IPBaseParallelApplication, self).start_logging()
279 # super(IPBaseParallelApplication, self).start_logging()
279
280
280 def start(self):
281 def start(self):
281 """Start the app for the engines subcommand."""
282 """Start the app for the engines subcommand."""
282 self.log.info("IPython cluster: started")
283 self.log.info("IPython cluster: started")
283 # First see if the cluster is already running
284 # First see if the cluster is already running
284
285
285 # Now log and daemonize
286 # Now log and daemonize
286 self.log.info(
287 self.log.info(
287 'Starting engines with [daemon=%r]' % self.daemonize
288 'Starting engines with [daemon=%r]' % self.daemonize
288 )
289 )
289 # TODO: Get daemonize working on Windows or as a Windows Server.
290 # TODO: Get daemonize working on Windows or as a Windows Server.
290 if self.daemonize:
291 if self.daemonize:
291 if os.name=='posix':
292 if os.name=='posix':
292 daemonize()
293 daemonize()
293
294
294 dc = ioloop.DelayedCallback(self.start_engines, 0, self.loop)
295 dc = ioloop.DelayedCallback(self.start_engines, 0, self.loop)
295 dc.start()
296 dc.start()
296 # Now write the new pid file AFTER our new forked pid is active.
297 # Now write the new pid file AFTER our new forked pid is active.
297 # self.write_pid_file()
298 # self.write_pid_file()
298 try:
299 try:
299 self.loop.start()
300 self.loop.start()
300 except KeyboardInterrupt:
301 except KeyboardInterrupt:
301 pass
302 pass
302 except zmq.ZMQError as e:
303 except zmq.ZMQError as e:
303 if e.errno == errno.EINTR:
304 if e.errno == errno.EINTR:
304 pass
305 pass
305 else:
306 else:
306 raise
307 raise
307
308
308 start_aliases = {}
309 start_aliases = {}
309 start_aliases.update(engine_aliases)
310 start_aliases.update(engine_aliases)
310 start_aliases.update(dict(
311 start_aliases.update(dict(
311 delay='IPClusterStart.delay',
312 delay='IPClusterStart.delay',
312 clean_logs='IPClusterStart.clean_logs',
313 clean_logs='IPClusterStart.clean_logs',
313 ))
314 ))
314
315
315 class IPClusterStart(IPClusterEngines):
316 class IPClusterStart(IPClusterEngines):
316
317
317 name = u'ipcluster'
318 name = u'ipcluster'
318 description = start_help
319 description = start_help
319 default_log_level = logging.INFO
320 default_log_level = logging.INFO
320 auto_create = Bool(True, config=True,
321 auto_create = Bool(True, config=True,
321 help="whether to create the profile_dir if it doesn't exist")
322 help="whether to create the profile_dir if it doesn't exist")
322 classes = List()
323 classes = List()
323 def _classes_default(self,):
324 def _classes_default(self,):
324 from IPython.parallel.apps import launcher
325 from IPython.parallel.apps import launcher
325 return [ProfileDir] + [IPClusterEngines] + launcher.all_launchers
326 return [ProfileDir] + [IPClusterEngines] + launcher.all_launchers
326
327
327 clean_logs = Bool(True, config=True,
328 clean_logs = Bool(True, config=True,
328 help="whether to cleanup old logs before starting")
329 help="whether to cleanup old logs before starting")
329
330
330 delay = CFloat(1., config=True,
331 delay = CFloat(1., config=True,
331 help="delay (in s) between starting the controller and the engines")
332 help="delay (in s) between starting the controller and the engines")
332
333
333 controller_launcher_class = Unicode('LocalControllerLauncher',
334 controller_launcher_class = DottedObjectName('LocalControllerLauncher',
334 config=True,
335 config=True,
335 help="The class for launching a Controller."
336 help="The class for launching a Controller."
336 )
337 )
337 reset = Bool(False, config=True,
338 reset = Bool(False, config=True,
338 help="Whether to reset config files as part of '--create'."
339 help="Whether to reset config files as part of '--create'."
339 )
340 )
340
341
341 # flags = Dict(flags)
342 # flags = Dict(flags)
342 aliases = Dict(start_aliases)
343 aliases = Dict(start_aliases)
343
344
344 def init_launchers(self):
345 def init_launchers(self):
345 self.controller_launcher = self.build_launcher(self.controller_launcher_class)
346 self.controller_launcher = self.build_launcher(self.controller_launcher_class)
346 self.engine_launcher = self.build_launcher(self.engine_launcher_class)
347 self.engine_launcher = self.build_launcher(self.engine_launcher_class)
347 self.controller_launcher.on_stop(self.stop_launchers)
348 self.controller_launcher.on_stop(self.stop_launchers)
348
349
349 def start_controller(self):
350 def start_controller(self):
350 self.controller_launcher.start(
351 self.controller_launcher.start(
351 self.profile_dir.location
352 self.profile_dir.location
352 )
353 )
353
354
354 def stop_controller(self):
355 def stop_controller(self):
355 # self.log.info("In stop_controller")
356 # self.log.info("In stop_controller")
356 if self.controller_launcher and self.controller_launcher.running:
357 if self.controller_launcher and self.controller_launcher.running:
357 return self.controller_launcher.stop()
358 return self.controller_launcher.stop()
358
359
359 def stop_launchers(self, r=None):
360 def stop_launchers(self, r=None):
360 if not self._stopping:
361 if not self._stopping:
361 self.stop_controller()
362 self.stop_controller()
362 super(IPClusterStart, self).stop_launchers()
363 super(IPClusterStart, self).stop_launchers()
363
364
364 def start(self):
365 def start(self):
365 """Start the app for the start subcommand."""
366 """Start the app for the start subcommand."""
366 # First see if the cluster is already running
367 # First see if the cluster is already running
367 try:
368 try:
368 pid = self.get_pid_from_file()
369 pid = self.get_pid_from_file()
369 except PIDFileError:
370 except PIDFileError:
370 pass
371 pass
371 else:
372 else:
372 if self.check_pid(pid):
373 if self.check_pid(pid):
373 self.log.critical(
374 self.log.critical(
374 'Cluster is already running with [pid=%s]. '
375 'Cluster is already running with [pid=%s]. '
375 'use "ipcluster stop" to stop the cluster.' % pid
376 'use "ipcluster stop" to stop the cluster.' % pid
376 )
377 )
377 # Here I exit with a unusual exit status that other processes
378 # Here I exit with a unusual exit status that other processes
378 # can watch for to learn how I existed.
379 # can watch for to learn how I existed.
379 self.exit(ALREADY_STARTED)
380 self.exit(ALREADY_STARTED)
380 else:
381 else:
381 self.remove_pid_file()
382 self.remove_pid_file()
382
383
383
384
384 # Now log and daemonize
385 # Now log and daemonize
385 self.log.info(
386 self.log.info(
386 'Starting ipcluster with [daemon=%r]' % self.daemonize
387 'Starting ipcluster with [daemon=%r]' % self.daemonize
387 )
388 )
388 # TODO: Get daemonize working on Windows or as a Windows Server.
389 # TODO: Get daemonize working on Windows or as a Windows Server.
389 if self.daemonize:
390 if self.daemonize:
390 if os.name=='posix':
391 if os.name=='posix':
391 daemonize()
392 daemonize()
392
393
393 dc = ioloop.DelayedCallback(self.start_controller, 0, self.loop)
394 dc = ioloop.DelayedCallback(self.start_controller, 0, self.loop)
394 dc.start()
395 dc.start()
395 dc = ioloop.DelayedCallback(self.start_engines, 1000*self.delay, self.loop)
396 dc = ioloop.DelayedCallback(self.start_engines, 1000*self.delay, self.loop)
396 dc.start()
397 dc.start()
397 # Now write the new pid file AFTER our new forked pid is active.
398 # Now write the new pid file AFTER our new forked pid is active.
398 self.write_pid_file()
399 self.write_pid_file()
399 try:
400 try:
400 self.loop.start()
401 self.loop.start()
401 except KeyboardInterrupt:
402 except KeyboardInterrupt:
402 pass
403 pass
403 except zmq.ZMQError as e:
404 except zmq.ZMQError as e:
404 if e.errno == errno.EINTR:
405 if e.errno == errno.EINTR:
405 pass
406 pass
406 else:
407 else:
407 raise
408 raise
408 finally:
409 finally:
409 self.remove_pid_file()
410 self.remove_pid_file()
410
411
411 base='IPython.parallel.apps.ipclusterapp.IPCluster'
412 base='IPython.parallel.apps.ipclusterapp.IPCluster'
412
413
413 class IPClusterApp(Application):
414 class IPClusterApp(Application):
414 name = u'ipcluster'
415 name = u'ipcluster'
415 description = _description
416 description = _description
416
417
417 subcommands = {
418 subcommands = {
418 'start' : (base+'Start', start_help),
419 'start' : (base+'Start', start_help),
419 'stop' : (base+'Stop', stop_help),
420 'stop' : (base+'Stop', stop_help),
420 'engines' : (base+'Engines', engines_help),
421 'engines' : (base+'Engines', engines_help),
421 }
422 }
422
423
423 # no aliases or flags for parent App
424 # no aliases or flags for parent App
424 aliases = Dict()
425 aliases = Dict()
425 flags = Dict()
426 flags = Dict()
426
427
427 def start(self):
428 def start(self):
428 if self.subapp is None:
429 if self.subapp is None:
429 print "No subcommand specified. Must specify one of: %s"%(self.subcommands.keys())
430 print "No subcommand specified. Must specify one of: %s"%(self.subcommands.keys())
430 print
431 print
431 self.print_description()
432 self.print_description()
432 self.print_subcommands()
433 self.print_subcommands()
433 self.exit(1)
434 self.exit(1)
434 else:
435 else:
435 return self.subapp.start()
436 return self.subapp.start()
436
437
437 def launch_new_instance():
438 def launch_new_instance():
438 """Create and run the IPython cluster."""
439 """Create and run the IPython cluster."""
439 app = IPClusterApp.instance()
440 app = IPClusterApp.instance()
440 app.initialize()
441 app.initialize()
441 app.start()
442 app.start()
442
443
443
444
444 if __name__ == '__main__':
445 if __name__ == '__main__':
445 launch_new_instance()
446 launch_new_instance()
446
447
@@ -1,1288 +1,1288 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 """The IPython Controller Hub with 0MQ
2 """The IPython Controller Hub with 0MQ
3 This is the master object that handles connections from engines and clients,
3 This is the master object that handles connections from engines and clients,
4 and monitors traffic through the various queues.
4 and monitors traffic through the various queues.
5
5
6 Authors:
6 Authors:
7
7
8 * Min RK
8 * Min RK
9 """
9 """
10 #-----------------------------------------------------------------------------
10 #-----------------------------------------------------------------------------
11 # Copyright (C) 2010 The IPython Development Team
11 # Copyright (C) 2010 The IPython Development Team
12 #
12 #
13 # Distributed under the terms of the BSD License. The full license is in
13 # Distributed under the terms of the BSD License. The full license is in
14 # the file COPYING, distributed as part of this software.
14 # the file COPYING, distributed as part of this software.
15 #-----------------------------------------------------------------------------
15 #-----------------------------------------------------------------------------
16
16
17 #-----------------------------------------------------------------------------
17 #-----------------------------------------------------------------------------
18 # Imports
18 # Imports
19 #-----------------------------------------------------------------------------
19 #-----------------------------------------------------------------------------
20 from __future__ import print_function
20 from __future__ import print_function
21
21
22 import sys
22 import sys
23 import time
23 import time
24 from datetime import datetime
24 from datetime import datetime
25
25
26 import zmq
26 import zmq
27 from zmq.eventloop import ioloop
27 from zmq.eventloop import ioloop
28 from zmq.eventloop.zmqstream import ZMQStream
28 from zmq.eventloop.zmqstream import ZMQStream
29
29
30 # internal:
30 # internal:
31 from IPython.utils.importstring import import_item
31 from IPython.utils.importstring import import_item
32 from IPython.utils.traitlets import (
32 from IPython.utils.traitlets import (
33 HasTraits, Instance, Int, Unicode, Dict, Set, Tuple, Bytes
33 HasTraits, Instance, Int, Unicode, Dict, Set, Tuple, Bytes, DottedObjectName
34 )
34 )
35
35
36 from IPython.parallel import error, util
36 from IPython.parallel import error, util
37 from IPython.parallel.factory import RegistrationFactory
37 from IPython.parallel.factory import RegistrationFactory
38
38
39 from IPython.zmq.session import SessionFactory
39 from IPython.zmq.session import SessionFactory
40
40
41 from .heartmonitor import HeartMonitor
41 from .heartmonitor import HeartMonitor
42
42
43 #-----------------------------------------------------------------------------
43 #-----------------------------------------------------------------------------
44 # Code
44 # Code
45 #-----------------------------------------------------------------------------
45 #-----------------------------------------------------------------------------
46
46
47 def _passer(*args, **kwargs):
47 def _passer(*args, **kwargs):
48 return
48 return
49
49
50 def _printer(*args, **kwargs):
50 def _printer(*args, **kwargs):
51 print (args)
51 print (args)
52 print (kwargs)
52 print (kwargs)
53
53
54 def empty_record():
54 def empty_record():
55 """Return an empty dict with all record keys."""
55 """Return an empty dict with all record keys."""
56 return {
56 return {
57 'msg_id' : None,
57 'msg_id' : None,
58 'header' : None,
58 'header' : None,
59 'content': None,
59 'content': None,
60 'buffers': None,
60 'buffers': None,
61 'submitted': None,
61 'submitted': None,
62 'client_uuid' : None,
62 'client_uuid' : None,
63 'engine_uuid' : None,
63 'engine_uuid' : None,
64 'started': None,
64 'started': None,
65 'completed': None,
65 'completed': None,
66 'resubmitted': None,
66 'resubmitted': None,
67 'result_header' : None,
67 'result_header' : None,
68 'result_content' : None,
68 'result_content' : None,
69 'result_buffers' : None,
69 'result_buffers' : None,
70 'queue' : None,
70 'queue' : None,
71 'pyin' : None,
71 'pyin' : None,
72 'pyout': None,
72 'pyout': None,
73 'pyerr': None,
73 'pyerr': None,
74 'stdout': '',
74 'stdout': '',
75 'stderr': '',
75 'stderr': '',
76 }
76 }
77
77
78 def init_record(msg):
78 def init_record(msg):
79 """Initialize a TaskRecord based on a request."""
79 """Initialize a TaskRecord based on a request."""
80 header = msg['header']
80 header = msg['header']
81 return {
81 return {
82 'msg_id' : header['msg_id'],
82 'msg_id' : header['msg_id'],
83 'header' : header,
83 'header' : header,
84 'content': msg['content'],
84 'content': msg['content'],
85 'buffers': msg['buffers'],
85 'buffers': msg['buffers'],
86 'submitted': header['date'],
86 'submitted': header['date'],
87 'client_uuid' : None,
87 'client_uuid' : None,
88 'engine_uuid' : None,
88 'engine_uuid' : None,
89 'started': None,
89 'started': None,
90 'completed': None,
90 'completed': None,
91 'resubmitted': None,
91 'resubmitted': None,
92 'result_header' : None,
92 'result_header' : None,
93 'result_content' : None,
93 'result_content' : None,
94 'result_buffers' : None,
94 'result_buffers' : None,
95 'queue' : None,
95 'queue' : None,
96 'pyin' : None,
96 'pyin' : None,
97 'pyout': None,
97 'pyout': None,
98 'pyerr': None,
98 'pyerr': None,
99 'stdout': '',
99 'stdout': '',
100 'stderr': '',
100 'stderr': '',
101 }
101 }
102
102
103
103
104 class EngineConnector(HasTraits):
104 class EngineConnector(HasTraits):
105 """A simple object for accessing the various zmq connections of an object.
105 """A simple object for accessing the various zmq connections of an object.
106 Attributes are:
106 Attributes are:
107 id (int): engine ID
107 id (int): engine ID
108 uuid (str): uuid (unused?)
108 uuid (str): uuid (unused?)
109 queue (str): identity of queue's XREQ socket
109 queue (str): identity of queue's XREQ socket
110 registration (str): identity of registration XREQ socket
110 registration (str): identity of registration XREQ socket
111 heartbeat (str): identity of heartbeat XREQ socket
111 heartbeat (str): identity of heartbeat XREQ socket
112 """
112 """
113 id=Int(0)
113 id=Int(0)
114 queue=Bytes()
114 queue=Bytes()
115 control=Bytes()
115 control=Bytes()
116 registration=Bytes()
116 registration=Bytes()
117 heartbeat=Bytes()
117 heartbeat=Bytes()
118 pending=Set()
118 pending=Set()
119
119
120 class HubFactory(RegistrationFactory):
120 class HubFactory(RegistrationFactory):
121 """The Configurable for setting up a Hub."""
121 """The Configurable for setting up a Hub."""
122
122
123 # port-pairs for monitoredqueues:
123 # port-pairs for monitoredqueues:
124 hb = Tuple(Int,Int,config=True,
124 hb = Tuple(Int,Int,config=True,
125 help="""XREQ/SUB Port pair for Engine heartbeats""")
125 help="""XREQ/SUB Port pair for Engine heartbeats""")
126 def _hb_default(self):
126 def _hb_default(self):
127 return tuple(util.select_random_ports(2))
127 return tuple(util.select_random_ports(2))
128
128
129 mux = Tuple(Int,Int,config=True,
129 mux = Tuple(Int,Int,config=True,
130 help="""Engine/Client Port pair for MUX queue""")
130 help="""Engine/Client Port pair for MUX queue""")
131
131
132 def _mux_default(self):
132 def _mux_default(self):
133 return tuple(util.select_random_ports(2))
133 return tuple(util.select_random_ports(2))
134
134
135 task = Tuple(Int,Int,config=True,
135 task = Tuple(Int,Int,config=True,
136 help="""Engine/Client Port pair for Task queue""")
136 help="""Engine/Client Port pair for Task queue""")
137 def _task_default(self):
137 def _task_default(self):
138 return tuple(util.select_random_ports(2))
138 return tuple(util.select_random_ports(2))
139
139
140 control = Tuple(Int,Int,config=True,
140 control = Tuple(Int,Int,config=True,
141 help="""Engine/Client Port pair for Control queue""")
141 help="""Engine/Client Port pair for Control queue""")
142
142
143 def _control_default(self):
143 def _control_default(self):
144 return tuple(util.select_random_ports(2))
144 return tuple(util.select_random_ports(2))
145
145
146 iopub = Tuple(Int,Int,config=True,
146 iopub = Tuple(Int,Int,config=True,
147 help="""Engine/Client Port pair for IOPub relay""")
147 help="""Engine/Client Port pair for IOPub relay""")
148
148
149 def _iopub_default(self):
149 def _iopub_default(self):
150 return tuple(util.select_random_ports(2))
150 return tuple(util.select_random_ports(2))
151
151
152 # single ports:
152 # single ports:
153 mon_port = Int(config=True,
153 mon_port = Int(config=True,
154 help="""Monitor (SUB) port for queue traffic""")
154 help="""Monitor (SUB) port for queue traffic""")
155
155
156 def _mon_port_default(self):
156 def _mon_port_default(self):
157 return util.select_random_ports(1)[0]
157 return util.select_random_ports(1)[0]
158
158
159 notifier_port = Int(config=True,
159 notifier_port = Int(config=True,
160 help="""PUB port for sending engine status notifications""")
160 help="""PUB port for sending engine status notifications""")
161
161
162 def _notifier_port_default(self):
162 def _notifier_port_default(self):
163 return util.select_random_ports(1)[0]
163 return util.select_random_ports(1)[0]
164
164
165 engine_ip = Unicode('127.0.0.1', config=True,
165 engine_ip = Unicode('127.0.0.1', config=True,
166 help="IP on which to listen for engine connections. [default: loopback]")
166 help="IP on which to listen for engine connections. [default: loopback]")
167 engine_transport = Unicode('tcp', config=True,
167 engine_transport = Unicode('tcp', config=True,
168 help="0MQ transport for engine connections. [default: tcp]")
168 help="0MQ transport for engine connections. [default: tcp]")
169
169
170 client_ip = Unicode('127.0.0.1', config=True,
170 client_ip = Unicode('127.0.0.1', config=True,
171 help="IP on which to listen for client connections. [default: loopback]")
171 help="IP on which to listen for client connections. [default: loopback]")
172 client_transport = Unicode('tcp', config=True,
172 client_transport = Unicode('tcp', config=True,
173 help="0MQ transport for client connections. [default : tcp]")
173 help="0MQ transport for client connections. [default : tcp]")
174
174
175 monitor_ip = Unicode('127.0.0.1', config=True,
175 monitor_ip = Unicode('127.0.0.1', config=True,
176 help="IP on which to listen for monitor messages. [default: loopback]")
176 help="IP on which to listen for monitor messages. [default: loopback]")
177 monitor_transport = Unicode('tcp', config=True,
177 monitor_transport = Unicode('tcp', config=True,
178 help="0MQ transport for monitor messages. [default : tcp]")
178 help="0MQ transport for monitor messages. [default : tcp]")
179
179
180 monitor_url = Unicode('')
180 monitor_url = Unicode('')
181
181
182 db_class = Unicode('IPython.parallel.controller.dictdb.DictDB', config=True,
182 db_class = DottedObjectName('IPython.parallel.controller.dictdb.DictDB',
183 help="""The class to use for the DB backend""")
183 config=True, help="""The class to use for the DB backend""")
184
184
185 # not configurable
185 # not configurable
186 db = Instance('IPython.parallel.controller.dictdb.BaseDB')
186 db = Instance('IPython.parallel.controller.dictdb.BaseDB')
187 heartmonitor = Instance('IPython.parallel.controller.heartmonitor.HeartMonitor')
187 heartmonitor = Instance('IPython.parallel.controller.heartmonitor.HeartMonitor')
188
188
189 def _ip_changed(self, name, old, new):
189 def _ip_changed(self, name, old, new):
190 self.engine_ip = new
190 self.engine_ip = new
191 self.client_ip = new
191 self.client_ip = new
192 self.monitor_ip = new
192 self.monitor_ip = new
193 self._update_monitor_url()
193 self._update_monitor_url()
194
194
195 def _update_monitor_url(self):
195 def _update_monitor_url(self):
196 self.monitor_url = "%s://%s:%i"%(self.monitor_transport, self.monitor_ip, self.mon_port)
196 self.monitor_url = "%s://%s:%i"%(self.monitor_transport, self.monitor_ip, self.mon_port)
197
197
198 def _transport_changed(self, name, old, new):
198 def _transport_changed(self, name, old, new):
199 self.engine_transport = new
199 self.engine_transport = new
200 self.client_transport = new
200 self.client_transport = new
201 self.monitor_transport = new
201 self.monitor_transport = new
202 self._update_monitor_url()
202 self._update_monitor_url()
203
203
204 def __init__(self, **kwargs):
204 def __init__(self, **kwargs):
205 super(HubFactory, self).__init__(**kwargs)
205 super(HubFactory, self).__init__(**kwargs)
206 self._update_monitor_url()
206 self._update_monitor_url()
207
207
208
208
209 def construct(self):
209 def construct(self):
210 self.init_hub()
210 self.init_hub()
211
211
212 def start(self):
212 def start(self):
213 self.heartmonitor.start()
213 self.heartmonitor.start()
214 self.log.info("Heartmonitor started")
214 self.log.info("Heartmonitor started")
215
215
216 def init_hub(self):
216 def init_hub(self):
217 """construct"""
217 """construct"""
218 client_iface = "%s://%s:"%(self.client_transport, self.client_ip) + "%i"
218 client_iface = "%s://%s:"%(self.client_transport, self.client_ip) + "%i"
219 engine_iface = "%s://%s:"%(self.engine_transport, self.engine_ip) + "%i"
219 engine_iface = "%s://%s:"%(self.engine_transport, self.engine_ip) + "%i"
220
220
221 ctx = self.context
221 ctx = self.context
222 loop = self.loop
222 loop = self.loop
223
223
224 # Registrar socket
224 # Registrar socket
225 q = ZMQStream(ctx.socket(zmq.XREP), loop)
225 q = ZMQStream(ctx.socket(zmq.XREP), loop)
226 q.bind(client_iface % self.regport)
226 q.bind(client_iface % self.regport)
227 self.log.info("Hub listening on %s for registration."%(client_iface%self.regport))
227 self.log.info("Hub listening on %s for registration."%(client_iface%self.regport))
228 if self.client_ip != self.engine_ip:
228 if self.client_ip != self.engine_ip:
229 q.bind(engine_iface % self.regport)
229 q.bind(engine_iface % self.regport)
230 self.log.info("Hub listening on %s for registration."%(engine_iface%self.regport))
230 self.log.info("Hub listening on %s for registration."%(engine_iface%self.regport))
231
231
232 ### Engine connections ###
232 ### Engine connections ###
233
233
234 # heartbeat
234 # heartbeat
235 hpub = ctx.socket(zmq.PUB)
235 hpub = ctx.socket(zmq.PUB)
236 hpub.bind(engine_iface % self.hb[0])
236 hpub.bind(engine_iface % self.hb[0])
237 hrep = ctx.socket(zmq.XREP)
237 hrep = ctx.socket(zmq.XREP)
238 hrep.bind(engine_iface % self.hb[1])
238 hrep.bind(engine_iface % self.hb[1])
239 self.heartmonitor = HeartMonitor(loop=loop, config=self.config, log=self.log,
239 self.heartmonitor = HeartMonitor(loop=loop, config=self.config, log=self.log,
240 pingstream=ZMQStream(hpub,loop),
240 pingstream=ZMQStream(hpub,loop),
241 pongstream=ZMQStream(hrep,loop)
241 pongstream=ZMQStream(hrep,loop)
242 )
242 )
243
243
244 ### Client connections ###
244 ### Client connections ###
245 # Notifier socket
245 # Notifier socket
246 n = ZMQStream(ctx.socket(zmq.PUB), loop)
246 n = ZMQStream(ctx.socket(zmq.PUB), loop)
247 n.bind(client_iface%self.notifier_port)
247 n.bind(client_iface%self.notifier_port)
248
248
249 ### build and launch the queues ###
249 ### build and launch the queues ###
250
250
251 # monitor socket
251 # monitor socket
252 sub = ctx.socket(zmq.SUB)
252 sub = ctx.socket(zmq.SUB)
253 sub.setsockopt(zmq.SUBSCRIBE, "")
253 sub.setsockopt(zmq.SUBSCRIBE, "")
254 sub.bind(self.monitor_url)
254 sub.bind(self.monitor_url)
255 sub.bind('inproc://monitor')
255 sub.bind('inproc://monitor')
256 sub = ZMQStream(sub, loop)
256 sub = ZMQStream(sub, loop)
257
257
258 # connect the db
258 # connect the db
259 self.log.info('Hub using DB backend: %r'%(self.db_class.split()[-1]))
259 self.log.info('Hub using DB backend: %r'%(self.db_class.split()[-1]))
260 # cdir = self.config.Global.cluster_dir
260 # cdir = self.config.Global.cluster_dir
261 self.db = import_item(str(self.db_class))(session=self.session.session,
261 self.db = import_item(str(self.db_class))(session=self.session.session,
262 config=self.config, log=self.log)
262 config=self.config, log=self.log)
263 time.sleep(.25)
263 time.sleep(.25)
264 try:
264 try:
265 scheme = self.config.TaskScheduler.scheme_name
265 scheme = self.config.TaskScheduler.scheme_name
266 except AttributeError:
266 except AttributeError:
267 from .scheduler import TaskScheduler
267 from .scheduler import TaskScheduler
268 scheme = TaskScheduler.scheme_name.get_default_value()
268 scheme = TaskScheduler.scheme_name.get_default_value()
269 # build connection dicts
269 # build connection dicts
270 self.engine_info = {
270 self.engine_info = {
271 'control' : engine_iface%self.control[1],
271 'control' : engine_iface%self.control[1],
272 'mux': engine_iface%self.mux[1],
272 'mux': engine_iface%self.mux[1],
273 'heartbeat': (engine_iface%self.hb[0], engine_iface%self.hb[1]),
273 'heartbeat': (engine_iface%self.hb[0], engine_iface%self.hb[1]),
274 'task' : engine_iface%self.task[1],
274 'task' : engine_iface%self.task[1],
275 'iopub' : engine_iface%self.iopub[1],
275 'iopub' : engine_iface%self.iopub[1],
276 # 'monitor' : engine_iface%self.mon_port,
276 # 'monitor' : engine_iface%self.mon_port,
277 }
277 }
278
278
279 self.client_info = {
279 self.client_info = {
280 'control' : client_iface%self.control[0],
280 'control' : client_iface%self.control[0],
281 'mux': client_iface%self.mux[0],
281 'mux': client_iface%self.mux[0],
282 'task' : (scheme, client_iface%self.task[0]),
282 'task' : (scheme, client_iface%self.task[0]),
283 'iopub' : client_iface%self.iopub[0],
283 'iopub' : client_iface%self.iopub[0],
284 'notification': client_iface%self.notifier_port
284 'notification': client_iface%self.notifier_port
285 }
285 }
286 self.log.debug("Hub engine addrs: %s"%self.engine_info)
286 self.log.debug("Hub engine addrs: %s"%self.engine_info)
287 self.log.debug("Hub client addrs: %s"%self.client_info)
287 self.log.debug("Hub client addrs: %s"%self.client_info)
288
288
289 # resubmit stream
289 # resubmit stream
290 r = ZMQStream(ctx.socket(zmq.XREQ), loop)
290 r = ZMQStream(ctx.socket(zmq.XREQ), loop)
291 url = util.disambiguate_url(self.client_info['task'][-1])
291 url = util.disambiguate_url(self.client_info['task'][-1])
292 r.setsockopt(zmq.IDENTITY, self.session.session)
292 r.setsockopt(zmq.IDENTITY, self.session.session)
293 r.connect(url)
293 r.connect(url)
294
294
295 self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor,
295 self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor,
296 query=q, notifier=n, resubmit=r, db=self.db,
296 query=q, notifier=n, resubmit=r, db=self.db,
297 engine_info=self.engine_info, client_info=self.client_info,
297 engine_info=self.engine_info, client_info=self.client_info,
298 log=self.log)
298 log=self.log)
299
299
300
300
301 class Hub(SessionFactory):
301 class Hub(SessionFactory):
302 """The IPython Controller Hub with 0MQ connections
302 """The IPython Controller Hub with 0MQ connections
303
303
304 Parameters
304 Parameters
305 ==========
305 ==========
306 loop: zmq IOLoop instance
306 loop: zmq IOLoop instance
307 session: Session object
307 session: Session object
308 <removed> context: zmq context for creating new connections (?)
308 <removed> context: zmq context for creating new connections (?)
309 queue: ZMQStream for monitoring the command queue (SUB)
309 queue: ZMQStream for monitoring the command queue (SUB)
310 query: ZMQStream for engine registration and client queries requests (XREP)
310 query: ZMQStream for engine registration and client queries requests (XREP)
311 heartbeat: HeartMonitor object checking the pulse of the engines
311 heartbeat: HeartMonitor object checking the pulse of the engines
312 notifier: ZMQStream for broadcasting engine registration changes (PUB)
312 notifier: ZMQStream for broadcasting engine registration changes (PUB)
313 db: connection to db for out of memory logging of commands
313 db: connection to db for out of memory logging of commands
314 NotImplemented
314 NotImplemented
315 engine_info: dict of zmq connection information for engines to connect
315 engine_info: dict of zmq connection information for engines to connect
316 to the queues.
316 to the queues.
317 client_info: dict of zmq connection information for engines to connect
317 client_info: dict of zmq connection information for engines to connect
318 to the queues.
318 to the queues.
319 """
319 """
320 # internal data structures:
320 # internal data structures:
321 ids=Set() # engine IDs
321 ids=Set() # engine IDs
322 keytable=Dict()
322 keytable=Dict()
323 by_ident=Dict()
323 by_ident=Dict()
324 engines=Dict()
324 engines=Dict()
325 clients=Dict()
325 clients=Dict()
326 hearts=Dict()
326 hearts=Dict()
327 pending=Set()
327 pending=Set()
328 queues=Dict() # pending msg_ids keyed by engine_id
328 queues=Dict() # pending msg_ids keyed by engine_id
329 tasks=Dict() # pending msg_ids submitted as tasks, keyed by client_id
329 tasks=Dict() # pending msg_ids submitted as tasks, keyed by client_id
330 completed=Dict() # completed msg_ids keyed by engine_id
330 completed=Dict() # completed msg_ids keyed by engine_id
331 all_completed=Set() # completed msg_ids keyed by engine_id
331 all_completed=Set() # completed msg_ids keyed by engine_id
332 dead_engines=Set() # completed msg_ids keyed by engine_id
332 dead_engines=Set() # completed msg_ids keyed by engine_id
333 unassigned=Set() # set of task msg_ds not yet assigned a destination
333 unassigned=Set() # set of task msg_ds not yet assigned a destination
334 incoming_registrations=Dict()
334 incoming_registrations=Dict()
335 registration_timeout=Int()
335 registration_timeout=Int()
336 _idcounter=Int(0)
336 _idcounter=Int(0)
337
337
338 # objects from constructor:
338 # objects from constructor:
339 query=Instance(ZMQStream)
339 query=Instance(ZMQStream)
340 monitor=Instance(ZMQStream)
340 monitor=Instance(ZMQStream)
341 notifier=Instance(ZMQStream)
341 notifier=Instance(ZMQStream)
342 resubmit=Instance(ZMQStream)
342 resubmit=Instance(ZMQStream)
343 heartmonitor=Instance(HeartMonitor)
343 heartmonitor=Instance(HeartMonitor)
344 db=Instance(object)
344 db=Instance(object)
345 client_info=Dict()
345 client_info=Dict()
346 engine_info=Dict()
346 engine_info=Dict()
347
347
348
348
349 def __init__(self, **kwargs):
349 def __init__(self, **kwargs):
350 """
350 """
351 # universal:
351 # universal:
352 loop: IOLoop for creating future connections
352 loop: IOLoop for creating future connections
353 session: streamsession for sending serialized data
353 session: streamsession for sending serialized data
354 # engine:
354 # engine:
355 queue: ZMQStream for monitoring queue messages
355 queue: ZMQStream for monitoring queue messages
356 query: ZMQStream for engine+client registration and client requests
356 query: ZMQStream for engine+client registration and client requests
357 heartbeat: HeartMonitor object for tracking engines
357 heartbeat: HeartMonitor object for tracking engines
358 # extra:
358 # extra:
359 db: ZMQStream for db connection (NotImplemented)
359 db: ZMQStream for db connection (NotImplemented)
360 engine_info: zmq address/protocol dict for engine connections
360 engine_info: zmq address/protocol dict for engine connections
361 client_info: zmq address/protocol dict for client connections
361 client_info: zmq address/protocol dict for client connections
362 """
362 """
363
363
364 super(Hub, self).__init__(**kwargs)
364 super(Hub, self).__init__(**kwargs)
365 self.registration_timeout = max(5000, 2*self.heartmonitor.period)
365 self.registration_timeout = max(5000, 2*self.heartmonitor.period)
366
366
367 # validate connection dicts:
367 # validate connection dicts:
368 for k,v in self.client_info.iteritems():
368 for k,v in self.client_info.iteritems():
369 if k == 'task':
369 if k == 'task':
370 util.validate_url_container(v[1])
370 util.validate_url_container(v[1])
371 else:
371 else:
372 util.validate_url_container(v)
372 util.validate_url_container(v)
373 # util.validate_url_container(self.client_info)
373 # util.validate_url_container(self.client_info)
374 util.validate_url_container(self.engine_info)
374 util.validate_url_container(self.engine_info)
375
375
376 # register our callbacks
376 # register our callbacks
377 self.query.on_recv(self.dispatch_query)
377 self.query.on_recv(self.dispatch_query)
378 self.monitor.on_recv(self.dispatch_monitor_traffic)
378 self.monitor.on_recv(self.dispatch_monitor_traffic)
379
379
380 self.heartmonitor.add_heart_failure_handler(self.handle_heart_failure)
380 self.heartmonitor.add_heart_failure_handler(self.handle_heart_failure)
381 self.heartmonitor.add_new_heart_handler(self.handle_new_heart)
381 self.heartmonitor.add_new_heart_handler(self.handle_new_heart)
382
382
383 self.monitor_handlers = { 'in' : self.save_queue_request,
383 self.monitor_handlers = { 'in' : self.save_queue_request,
384 'out': self.save_queue_result,
384 'out': self.save_queue_result,
385 'intask': self.save_task_request,
385 'intask': self.save_task_request,
386 'outtask': self.save_task_result,
386 'outtask': self.save_task_result,
387 'tracktask': self.save_task_destination,
387 'tracktask': self.save_task_destination,
388 'incontrol': _passer,
388 'incontrol': _passer,
389 'outcontrol': _passer,
389 'outcontrol': _passer,
390 'iopub': self.save_iopub_message,
390 'iopub': self.save_iopub_message,
391 }
391 }
392
392
393 self.query_handlers = {'queue_request': self.queue_status,
393 self.query_handlers = {'queue_request': self.queue_status,
394 'result_request': self.get_results,
394 'result_request': self.get_results,
395 'history_request': self.get_history,
395 'history_request': self.get_history,
396 'db_request': self.db_query,
396 'db_request': self.db_query,
397 'purge_request': self.purge_results,
397 'purge_request': self.purge_results,
398 'load_request': self.check_load,
398 'load_request': self.check_load,
399 'resubmit_request': self.resubmit_task,
399 'resubmit_request': self.resubmit_task,
400 'shutdown_request': self.shutdown_request,
400 'shutdown_request': self.shutdown_request,
401 'registration_request' : self.register_engine,
401 'registration_request' : self.register_engine,
402 'unregistration_request' : self.unregister_engine,
402 'unregistration_request' : self.unregister_engine,
403 'connection_request': self.connection_request,
403 'connection_request': self.connection_request,
404 }
404 }
405
405
406 # ignore resubmit replies
406 # ignore resubmit replies
407 self.resubmit.on_recv(lambda msg: None, copy=False)
407 self.resubmit.on_recv(lambda msg: None, copy=False)
408
408
409 self.log.info("hub::created hub")
409 self.log.info("hub::created hub")
410
410
411 @property
411 @property
412 def _next_id(self):
412 def _next_id(self):
413 """gemerate a new ID.
413 """gemerate a new ID.
414
414
415 No longer reuse old ids, just count from 0."""
415 No longer reuse old ids, just count from 0."""
416 newid = self._idcounter
416 newid = self._idcounter
417 self._idcounter += 1
417 self._idcounter += 1
418 return newid
418 return newid
419 # newid = 0
419 # newid = 0
420 # incoming = [id[0] for id in self.incoming_registrations.itervalues()]
420 # incoming = [id[0] for id in self.incoming_registrations.itervalues()]
421 # # print newid, self.ids, self.incoming_registrations
421 # # print newid, self.ids, self.incoming_registrations
422 # while newid in self.ids or newid in incoming:
422 # while newid in self.ids or newid in incoming:
423 # newid += 1
423 # newid += 1
424 # return newid
424 # return newid
425
425
426 #-----------------------------------------------------------------------------
426 #-----------------------------------------------------------------------------
427 # message validation
427 # message validation
428 #-----------------------------------------------------------------------------
428 #-----------------------------------------------------------------------------
429
429
430 def _validate_targets(self, targets):
430 def _validate_targets(self, targets):
431 """turn any valid targets argument into a list of integer ids"""
431 """turn any valid targets argument into a list of integer ids"""
432 if targets is None:
432 if targets is None:
433 # default to all
433 # default to all
434 targets = self.ids
434 targets = self.ids
435
435
436 if isinstance(targets, (int,str,unicode)):
436 if isinstance(targets, (int,str,unicode)):
437 # only one target specified
437 # only one target specified
438 targets = [targets]
438 targets = [targets]
439 _targets = []
439 _targets = []
440 for t in targets:
440 for t in targets:
441 # map raw identities to ids
441 # map raw identities to ids
442 if isinstance(t, (str,unicode)):
442 if isinstance(t, (str,unicode)):
443 t = self.by_ident.get(t, t)
443 t = self.by_ident.get(t, t)
444 _targets.append(t)
444 _targets.append(t)
445 targets = _targets
445 targets = _targets
446 bad_targets = [ t for t in targets if t not in self.ids ]
446 bad_targets = [ t for t in targets if t not in self.ids ]
447 if bad_targets:
447 if bad_targets:
448 raise IndexError("No Such Engine: %r"%bad_targets)
448 raise IndexError("No Such Engine: %r"%bad_targets)
449 if not targets:
449 if not targets:
450 raise IndexError("No Engines Registered")
450 raise IndexError("No Engines Registered")
451 return targets
451 return targets
452
452
453 #-----------------------------------------------------------------------------
453 #-----------------------------------------------------------------------------
454 # dispatch methods (1 per stream)
454 # dispatch methods (1 per stream)
455 #-----------------------------------------------------------------------------
455 #-----------------------------------------------------------------------------
456
456
457
457
458 def dispatch_monitor_traffic(self, msg):
458 def dispatch_monitor_traffic(self, msg):
459 """all ME and Task queue messages come through here, as well as
459 """all ME and Task queue messages come through here, as well as
460 IOPub traffic."""
460 IOPub traffic."""
461 self.log.debug("monitor traffic: %r"%msg[:2])
461 self.log.debug("monitor traffic: %r"%msg[:2])
462 switch = msg[0]
462 switch = msg[0]
463 try:
463 try:
464 idents, msg = self.session.feed_identities(msg[1:])
464 idents, msg = self.session.feed_identities(msg[1:])
465 except ValueError:
465 except ValueError:
466 idents=[]
466 idents=[]
467 if not idents:
467 if not idents:
468 self.log.error("Bad Monitor Message: %r"%msg)
468 self.log.error("Bad Monitor Message: %r"%msg)
469 return
469 return
470 handler = self.monitor_handlers.get(switch, None)
470 handler = self.monitor_handlers.get(switch, None)
471 if handler is not None:
471 if handler is not None:
472 handler(idents, msg)
472 handler(idents, msg)
473 else:
473 else:
474 self.log.error("Invalid monitor topic: %r"%switch)
474 self.log.error("Invalid monitor topic: %r"%switch)
475
475
476
476
477 def dispatch_query(self, msg):
477 def dispatch_query(self, msg):
478 """Route registration requests and queries from clients."""
478 """Route registration requests and queries from clients."""
479 try:
479 try:
480 idents, msg = self.session.feed_identities(msg)
480 idents, msg = self.session.feed_identities(msg)
481 except ValueError:
481 except ValueError:
482 idents = []
482 idents = []
483 if not idents:
483 if not idents:
484 self.log.error("Bad Query Message: %r"%msg)
484 self.log.error("Bad Query Message: %r"%msg)
485 return
485 return
486 client_id = idents[0]
486 client_id = idents[0]
487 try:
487 try:
488 msg = self.session.unpack_message(msg, content=True)
488 msg = self.session.unpack_message(msg, content=True)
489 except Exception:
489 except Exception:
490 content = error.wrap_exception()
490 content = error.wrap_exception()
491 self.log.error("Bad Query Message: %r"%msg, exc_info=True)
491 self.log.error("Bad Query Message: %r"%msg, exc_info=True)
492 self.session.send(self.query, "hub_error", ident=client_id,
492 self.session.send(self.query, "hub_error", ident=client_id,
493 content=content)
493 content=content)
494 return
494 return
495 # print client_id, header, parent, content
495 # print client_id, header, parent, content
496 #switch on message type:
496 #switch on message type:
497 msg_type = msg['msg_type']
497 msg_type = msg['msg_type']
498 self.log.info("client::client %r requested %r"%(client_id, msg_type))
498 self.log.info("client::client %r requested %r"%(client_id, msg_type))
499 handler = self.query_handlers.get(msg_type, None)
499 handler = self.query_handlers.get(msg_type, None)
500 try:
500 try:
501 assert handler is not None, "Bad Message Type: %r"%msg_type
501 assert handler is not None, "Bad Message Type: %r"%msg_type
502 except:
502 except:
503 content = error.wrap_exception()
503 content = error.wrap_exception()
504 self.log.error("Bad Message Type: %r"%msg_type, exc_info=True)
504 self.log.error("Bad Message Type: %r"%msg_type, exc_info=True)
505 self.session.send(self.query, "hub_error", ident=client_id,
505 self.session.send(self.query, "hub_error", ident=client_id,
506 content=content)
506 content=content)
507 return
507 return
508
508
509 else:
509 else:
510 handler(idents, msg)
510 handler(idents, msg)
511
511
512 def dispatch_db(self, msg):
512 def dispatch_db(self, msg):
513 """"""
513 """"""
514 raise NotImplementedError
514 raise NotImplementedError
515
515
516 #---------------------------------------------------------------------------
516 #---------------------------------------------------------------------------
517 # handler methods (1 per event)
517 # handler methods (1 per event)
518 #---------------------------------------------------------------------------
518 #---------------------------------------------------------------------------
519
519
520 #----------------------- Heartbeat --------------------------------------
520 #----------------------- Heartbeat --------------------------------------
521
521
522 def handle_new_heart(self, heart):
522 def handle_new_heart(self, heart):
523 """handler to attach to heartbeater.
523 """handler to attach to heartbeater.
524 Called when a new heart starts to beat.
524 Called when a new heart starts to beat.
525 Triggers completion of registration."""
525 Triggers completion of registration."""
526 self.log.debug("heartbeat::handle_new_heart(%r)"%heart)
526 self.log.debug("heartbeat::handle_new_heart(%r)"%heart)
527 if heart not in self.incoming_registrations:
527 if heart not in self.incoming_registrations:
528 self.log.info("heartbeat::ignoring new heart: %r"%heart)
528 self.log.info("heartbeat::ignoring new heart: %r"%heart)
529 else:
529 else:
530 self.finish_registration(heart)
530 self.finish_registration(heart)
531
531
532
532
533 def handle_heart_failure(self, heart):
533 def handle_heart_failure(self, heart):
534 """handler to attach to heartbeater.
534 """handler to attach to heartbeater.
535 called when a previously registered heart fails to respond to beat request.
535 called when a previously registered heart fails to respond to beat request.
536 triggers unregistration"""
536 triggers unregistration"""
537 self.log.debug("heartbeat::handle_heart_failure(%r)"%heart)
537 self.log.debug("heartbeat::handle_heart_failure(%r)"%heart)
538 eid = self.hearts.get(heart, None)
538 eid = self.hearts.get(heart, None)
539 queue = self.engines[eid].queue
539 queue = self.engines[eid].queue
540 if eid is None:
540 if eid is None:
541 self.log.info("heartbeat::ignoring heart failure %r"%heart)
541 self.log.info("heartbeat::ignoring heart failure %r"%heart)
542 else:
542 else:
543 self.unregister_engine(heart, dict(content=dict(id=eid, queue=queue)))
543 self.unregister_engine(heart, dict(content=dict(id=eid, queue=queue)))
544
544
545 #----------------------- MUX Queue Traffic ------------------------------
545 #----------------------- MUX Queue Traffic ------------------------------
546
546
547 def save_queue_request(self, idents, msg):
547 def save_queue_request(self, idents, msg):
548 if len(idents) < 2:
548 if len(idents) < 2:
549 self.log.error("invalid identity prefix: %r"%idents)
549 self.log.error("invalid identity prefix: %r"%idents)
550 return
550 return
551 queue_id, client_id = idents[:2]
551 queue_id, client_id = idents[:2]
552 try:
552 try:
553 msg = self.session.unpack_message(msg)
553 msg = self.session.unpack_message(msg)
554 except Exception:
554 except Exception:
555 self.log.error("queue::client %r sent invalid message to %r: %r"%(client_id, queue_id, msg), exc_info=True)
555 self.log.error("queue::client %r sent invalid message to %r: %r"%(client_id, queue_id, msg), exc_info=True)
556 return
556 return
557
557
558 eid = self.by_ident.get(queue_id, None)
558 eid = self.by_ident.get(queue_id, None)
559 if eid is None:
559 if eid is None:
560 self.log.error("queue::target %r not registered"%queue_id)
560 self.log.error("queue::target %r not registered"%queue_id)
561 self.log.debug("queue:: valid are: %r"%(self.by_ident.keys()))
561 self.log.debug("queue:: valid are: %r"%(self.by_ident.keys()))
562 return
562 return
563 record = init_record(msg)
563 record = init_record(msg)
564 msg_id = record['msg_id']
564 msg_id = record['msg_id']
565 record['engine_uuid'] = queue_id
565 record['engine_uuid'] = queue_id
566 record['client_uuid'] = client_id
566 record['client_uuid'] = client_id
567 record['queue'] = 'mux'
567 record['queue'] = 'mux'
568
568
569 try:
569 try:
570 # it's posible iopub arrived first:
570 # it's posible iopub arrived first:
571 existing = self.db.get_record(msg_id)
571 existing = self.db.get_record(msg_id)
572 for key,evalue in existing.iteritems():
572 for key,evalue in existing.iteritems():
573 rvalue = record.get(key, None)
573 rvalue = record.get(key, None)
574 if evalue and rvalue and evalue != rvalue:
574 if evalue and rvalue and evalue != rvalue:
575 self.log.warn("conflicting initial state for record: %r:%r <%r> %r"%(msg_id, rvalue, key, evalue))
575 self.log.warn("conflicting initial state for record: %r:%r <%r> %r"%(msg_id, rvalue, key, evalue))
576 elif evalue and not rvalue:
576 elif evalue and not rvalue:
577 record[key] = evalue
577 record[key] = evalue
578 try:
578 try:
579 self.db.update_record(msg_id, record)
579 self.db.update_record(msg_id, record)
580 except Exception:
580 except Exception:
581 self.log.error("DB Error updating record %r"%msg_id, exc_info=True)
581 self.log.error("DB Error updating record %r"%msg_id, exc_info=True)
582 except KeyError:
582 except KeyError:
583 try:
583 try:
584 self.db.add_record(msg_id, record)
584 self.db.add_record(msg_id, record)
585 except Exception:
585 except Exception:
586 self.log.error("DB Error adding record %r"%msg_id, exc_info=True)
586 self.log.error("DB Error adding record %r"%msg_id, exc_info=True)
587
587
588
588
589 self.pending.add(msg_id)
589 self.pending.add(msg_id)
590 self.queues[eid].append(msg_id)
590 self.queues[eid].append(msg_id)
591
591
592 def save_queue_result(self, idents, msg):
592 def save_queue_result(self, idents, msg):
593 if len(idents) < 2:
593 if len(idents) < 2:
594 self.log.error("invalid identity prefix: %r"%idents)
594 self.log.error("invalid identity prefix: %r"%idents)
595 return
595 return
596
596
597 client_id, queue_id = idents[:2]
597 client_id, queue_id = idents[:2]
598 try:
598 try:
599 msg = self.session.unpack_message(msg)
599 msg = self.session.unpack_message(msg)
600 except Exception:
600 except Exception:
601 self.log.error("queue::engine %r sent invalid message to %r: %r"%(
601 self.log.error("queue::engine %r sent invalid message to %r: %r"%(
602 queue_id,client_id, msg), exc_info=True)
602 queue_id,client_id, msg), exc_info=True)
603 return
603 return
604
604
605 eid = self.by_ident.get(queue_id, None)
605 eid = self.by_ident.get(queue_id, None)
606 if eid is None:
606 if eid is None:
607 self.log.error("queue::unknown engine %r is sending a reply: "%queue_id)
607 self.log.error("queue::unknown engine %r is sending a reply: "%queue_id)
608 return
608 return
609
609
610 parent = msg['parent_header']
610 parent = msg['parent_header']
611 if not parent:
611 if not parent:
612 return
612 return
613 msg_id = parent['msg_id']
613 msg_id = parent['msg_id']
614 if msg_id in self.pending:
614 if msg_id in self.pending:
615 self.pending.remove(msg_id)
615 self.pending.remove(msg_id)
616 self.all_completed.add(msg_id)
616 self.all_completed.add(msg_id)
617 self.queues[eid].remove(msg_id)
617 self.queues[eid].remove(msg_id)
618 self.completed[eid].append(msg_id)
618 self.completed[eid].append(msg_id)
619 elif msg_id not in self.all_completed:
619 elif msg_id not in self.all_completed:
620 # it could be a result from a dead engine that died before delivering the
620 # it could be a result from a dead engine that died before delivering the
621 # result
621 # result
622 self.log.warn("queue:: unknown msg finished %r"%msg_id)
622 self.log.warn("queue:: unknown msg finished %r"%msg_id)
623 return
623 return
624 # update record anyway, because the unregistration could have been premature
624 # update record anyway, because the unregistration could have been premature
625 rheader = msg['header']
625 rheader = msg['header']
626 completed = rheader['date']
626 completed = rheader['date']
627 started = rheader.get('started', None)
627 started = rheader.get('started', None)
628 result = {
628 result = {
629 'result_header' : rheader,
629 'result_header' : rheader,
630 'result_content': msg['content'],
630 'result_content': msg['content'],
631 'started' : started,
631 'started' : started,
632 'completed' : completed
632 'completed' : completed
633 }
633 }
634
634
635 result['result_buffers'] = msg['buffers']
635 result['result_buffers'] = msg['buffers']
636 try:
636 try:
637 self.db.update_record(msg_id, result)
637 self.db.update_record(msg_id, result)
638 except Exception:
638 except Exception:
639 self.log.error("DB Error updating record %r"%msg_id, exc_info=True)
639 self.log.error("DB Error updating record %r"%msg_id, exc_info=True)
640
640
641
641
642 #--------------------- Task Queue Traffic ------------------------------
642 #--------------------- Task Queue Traffic ------------------------------
643
643
644 def save_task_request(self, idents, msg):
644 def save_task_request(self, idents, msg):
645 """Save the submission of a task."""
645 """Save the submission of a task."""
646 client_id = idents[0]
646 client_id = idents[0]
647
647
648 try:
648 try:
649 msg = self.session.unpack_message(msg)
649 msg = self.session.unpack_message(msg)
650 except Exception:
650 except Exception:
651 self.log.error("task::client %r sent invalid task message: %r"%(
651 self.log.error("task::client %r sent invalid task message: %r"%(
652 client_id, msg), exc_info=True)
652 client_id, msg), exc_info=True)
653 return
653 return
654 record = init_record(msg)
654 record = init_record(msg)
655
655
656 record['client_uuid'] = client_id
656 record['client_uuid'] = client_id
657 record['queue'] = 'task'
657 record['queue'] = 'task'
658 header = msg['header']
658 header = msg['header']
659 msg_id = header['msg_id']
659 msg_id = header['msg_id']
660 self.pending.add(msg_id)
660 self.pending.add(msg_id)
661 self.unassigned.add(msg_id)
661 self.unassigned.add(msg_id)
662 try:
662 try:
663 # it's posible iopub arrived first:
663 # it's posible iopub arrived first:
664 existing = self.db.get_record(msg_id)
664 existing = self.db.get_record(msg_id)
665 if existing['resubmitted']:
665 if existing['resubmitted']:
666 for key in ('submitted', 'client_uuid', 'buffers'):
666 for key in ('submitted', 'client_uuid', 'buffers'):
667 # don't clobber these keys on resubmit
667 # don't clobber these keys on resubmit
668 # submitted and client_uuid should be different
668 # submitted and client_uuid should be different
669 # and buffers might be big, and shouldn't have changed
669 # and buffers might be big, and shouldn't have changed
670 record.pop(key)
670 record.pop(key)
671 # still check content,header which should not change
671 # still check content,header which should not change
672 # but are not expensive to compare as buffers
672 # but are not expensive to compare as buffers
673
673
674 for key,evalue in existing.iteritems():
674 for key,evalue in existing.iteritems():
675 if key.endswith('buffers'):
675 if key.endswith('buffers'):
676 # don't compare buffers
676 # don't compare buffers
677 continue
677 continue
678 rvalue = record.get(key, None)
678 rvalue = record.get(key, None)
679 if evalue and rvalue and evalue != rvalue:
679 if evalue and rvalue and evalue != rvalue:
680 self.log.warn("conflicting initial state for record: %r:%r <%r> %r"%(msg_id, rvalue, key, evalue))
680 self.log.warn("conflicting initial state for record: %r:%r <%r> %r"%(msg_id, rvalue, key, evalue))
681 elif evalue and not rvalue:
681 elif evalue and not rvalue:
682 record[key] = evalue
682 record[key] = evalue
683 try:
683 try:
684 self.db.update_record(msg_id, record)
684 self.db.update_record(msg_id, record)
685 except Exception:
685 except Exception:
686 self.log.error("DB Error updating record %r"%msg_id, exc_info=True)
686 self.log.error("DB Error updating record %r"%msg_id, exc_info=True)
687 except KeyError:
687 except KeyError:
688 try:
688 try:
689 self.db.add_record(msg_id, record)
689 self.db.add_record(msg_id, record)
690 except Exception:
690 except Exception:
691 self.log.error("DB Error adding record %r"%msg_id, exc_info=True)
691 self.log.error("DB Error adding record %r"%msg_id, exc_info=True)
692 except Exception:
692 except Exception:
693 self.log.error("DB Error saving task request %r"%msg_id, exc_info=True)
693 self.log.error("DB Error saving task request %r"%msg_id, exc_info=True)
694
694
695 def save_task_result(self, idents, msg):
695 def save_task_result(self, idents, msg):
696 """save the result of a completed task."""
696 """save the result of a completed task."""
697 client_id = idents[0]
697 client_id = idents[0]
698 try:
698 try:
699 msg = self.session.unpack_message(msg)
699 msg = self.session.unpack_message(msg)
700 except Exception:
700 except Exception:
701 self.log.error("task::invalid task result message send to %r: %r"%(
701 self.log.error("task::invalid task result message send to %r: %r"%(
702 client_id, msg), exc_info=True)
702 client_id, msg), exc_info=True)
703 return
703 return
704
704
705 parent = msg['parent_header']
705 parent = msg['parent_header']
706 if not parent:
706 if not parent:
707 # print msg
707 # print msg
708 self.log.warn("Task %r had no parent!"%msg)
708 self.log.warn("Task %r had no parent!"%msg)
709 return
709 return
710 msg_id = parent['msg_id']
710 msg_id = parent['msg_id']
711 if msg_id in self.unassigned:
711 if msg_id in self.unassigned:
712 self.unassigned.remove(msg_id)
712 self.unassigned.remove(msg_id)
713
713
714 header = msg['header']
714 header = msg['header']
715 engine_uuid = header.get('engine', None)
715 engine_uuid = header.get('engine', None)
716 eid = self.by_ident.get(engine_uuid, None)
716 eid = self.by_ident.get(engine_uuid, None)
717
717
718 if msg_id in self.pending:
718 if msg_id in self.pending:
719 self.pending.remove(msg_id)
719 self.pending.remove(msg_id)
720 self.all_completed.add(msg_id)
720 self.all_completed.add(msg_id)
721 if eid is not None:
721 if eid is not None:
722 self.completed[eid].append(msg_id)
722 self.completed[eid].append(msg_id)
723 if msg_id in self.tasks[eid]:
723 if msg_id in self.tasks[eid]:
724 self.tasks[eid].remove(msg_id)
724 self.tasks[eid].remove(msg_id)
725 completed = header['date']
725 completed = header['date']
726 started = header.get('started', None)
726 started = header.get('started', None)
727 result = {
727 result = {
728 'result_header' : header,
728 'result_header' : header,
729 'result_content': msg['content'],
729 'result_content': msg['content'],
730 'started' : started,
730 'started' : started,
731 'completed' : completed,
731 'completed' : completed,
732 'engine_uuid': engine_uuid
732 'engine_uuid': engine_uuid
733 }
733 }
734
734
735 result['result_buffers'] = msg['buffers']
735 result['result_buffers'] = msg['buffers']
736 try:
736 try:
737 self.db.update_record(msg_id, result)
737 self.db.update_record(msg_id, result)
738 except Exception:
738 except Exception:
739 self.log.error("DB Error saving task request %r"%msg_id, exc_info=True)
739 self.log.error("DB Error saving task request %r"%msg_id, exc_info=True)
740
740
741 else:
741 else:
742 self.log.debug("task::unknown task %r finished"%msg_id)
742 self.log.debug("task::unknown task %r finished"%msg_id)
743
743
744 def save_task_destination(self, idents, msg):
744 def save_task_destination(self, idents, msg):
745 try:
745 try:
746 msg = self.session.unpack_message(msg, content=True)
746 msg = self.session.unpack_message(msg, content=True)
747 except Exception:
747 except Exception:
748 self.log.error("task::invalid task tracking message", exc_info=True)
748 self.log.error("task::invalid task tracking message", exc_info=True)
749 return
749 return
750 content = msg['content']
750 content = msg['content']
751 # print (content)
751 # print (content)
752 msg_id = content['msg_id']
752 msg_id = content['msg_id']
753 engine_uuid = content['engine_id']
753 engine_uuid = content['engine_id']
754 eid = self.by_ident[engine_uuid]
754 eid = self.by_ident[engine_uuid]
755
755
756 self.log.info("task::task %r arrived on %r"%(msg_id, eid))
756 self.log.info("task::task %r arrived on %r"%(msg_id, eid))
757 if msg_id in self.unassigned:
757 if msg_id in self.unassigned:
758 self.unassigned.remove(msg_id)
758 self.unassigned.remove(msg_id)
759 # else:
759 # else:
760 # self.log.debug("task::task %r not listed as MIA?!"%(msg_id))
760 # self.log.debug("task::task %r not listed as MIA?!"%(msg_id))
761
761
762 self.tasks[eid].append(msg_id)
762 self.tasks[eid].append(msg_id)
763 # self.pending[msg_id][1].update(received=datetime.now(),engine=(eid,engine_uuid))
763 # self.pending[msg_id][1].update(received=datetime.now(),engine=(eid,engine_uuid))
764 try:
764 try:
765 self.db.update_record(msg_id, dict(engine_uuid=engine_uuid))
765 self.db.update_record(msg_id, dict(engine_uuid=engine_uuid))
766 except Exception:
766 except Exception:
767 self.log.error("DB Error saving task destination %r"%msg_id, exc_info=True)
767 self.log.error("DB Error saving task destination %r"%msg_id, exc_info=True)
768
768
769
769
770 def mia_task_request(self, idents, msg):
770 def mia_task_request(self, idents, msg):
771 raise NotImplementedError
771 raise NotImplementedError
772 client_id = idents[0]
772 client_id = idents[0]
773 # content = dict(mia=self.mia,status='ok')
773 # content = dict(mia=self.mia,status='ok')
774 # self.session.send('mia_reply', content=content, idents=client_id)
774 # self.session.send('mia_reply', content=content, idents=client_id)
775
775
776
776
777 #--------------------- IOPub Traffic ------------------------------
777 #--------------------- IOPub Traffic ------------------------------
778
778
779 def save_iopub_message(self, topics, msg):
779 def save_iopub_message(self, topics, msg):
780 """save an iopub message into the db"""
780 """save an iopub message into the db"""
781 # print (topics)
781 # print (topics)
782 try:
782 try:
783 msg = self.session.unpack_message(msg, content=True)
783 msg = self.session.unpack_message(msg, content=True)
784 except Exception:
784 except Exception:
785 self.log.error("iopub::invalid IOPub message", exc_info=True)
785 self.log.error("iopub::invalid IOPub message", exc_info=True)
786 return
786 return
787
787
788 parent = msg['parent_header']
788 parent = msg['parent_header']
789 if not parent:
789 if not parent:
790 self.log.error("iopub::invalid IOPub message: %r"%msg)
790 self.log.error("iopub::invalid IOPub message: %r"%msg)
791 return
791 return
792 msg_id = parent['msg_id']
792 msg_id = parent['msg_id']
793 msg_type = msg['msg_type']
793 msg_type = msg['msg_type']
794 content = msg['content']
794 content = msg['content']
795
795
796 # ensure msg_id is in db
796 # ensure msg_id is in db
797 try:
797 try:
798 rec = self.db.get_record(msg_id)
798 rec = self.db.get_record(msg_id)
799 except KeyError:
799 except KeyError:
800 rec = empty_record()
800 rec = empty_record()
801 rec['msg_id'] = msg_id
801 rec['msg_id'] = msg_id
802 self.db.add_record(msg_id, rec)
802 self.db.add_record(msg_id, rec)
803 # stream
803 # stream
804 d = {}
804 d = {}
805 if msg_type == 'stream':
805 if msg_type == 'stream':
806 name = content['name']
806 name = content['name']
807 s = rec[name] or ''
807 s = rec[name] or ''
808 d[name] = s + content['data']
808 d[name] = s + content['data']
809
809
810 elif msg_type == 'pyerr':
810 elif msg_type == 'pyerr':
811 d['pyerr'] = content
811 d['pyerr'] = content
812 elif msg_type == 'pyin':
812 elif msg_type == 'pyin':
813 d['pyin'] = content['code']
813 d['pyin'] = content['code']
814 else:
814 else:
815 d[msg_type] = content.get('data', '')
815 d[msg_type] = content.get('data', '')
816
816
817 try:
817 try:
818 self.db.update_record(msg_id, d)
818 self.db.update_record(msg_id, d)
819 except Exception:
819 except Exception:
820 self.log.error("DB Error saving iopub message %r"%msg_id, exc_info=True)
820 self.log.error("DB Error saving iopub message %r"%msg_id, exc_info=True)
821
821
822
822
823
823
824 #-------------------------------------------------------------------------
824 #-------------------------------------------------------------------------
825 # Registration requests
825 # Registration requests
826 #-------------------------------------------------------------------------
826 #-------------------------------------------------------------------------
827
827
828 def connection_request(self, client_id, msg):
828 def connection_request(self, client_id, msg):
829 """Reply with connection addresses for clients."""
829 """Reply with connection addresses for clients."""
830 self.log.info("client::client %r connected"%client_id)
830 self.log.info("client::client %r connected"%client_id)
831 content = dict(status='ok')
831 content = dict(status='ok')
832 content.update(self.client_info)
832 content.update(self.client_info)
833 jsonable = {}
833 jsonable = {}
834 for k,v in self.keytable.iteritems():
834 for k,v in self.keytable.iteritems():
835 if v not in self.dead_engines:
835 if v not in self.dead_engines:
836 jsonable[str(k)] = v
836 jsonable[str(k)] = v
837 content['engines'] = jsonable
837 content['engines'] = jsonable
838 self.session.send(self.query, 'connection_reply', content, parent=msg, ident=client_id)
838 self.session.send(self.query, 'connection_reply', content, parent=msg, ident=client_id)
839
839
840 def register_engine(self, reg, msg):
840 def register_engine(self, reg, msg):
841 """Register a new engine."""
841 """Register a new engine."""
842 content = msg['content']
842 content = msg['content']
843 try:
843 try:
844 queue = content['queue']
844 queue = content['queue']
845 except KeyError:
845 except KeyError:
846 self.log.error("registration::queue not specified", exc_info=True)
846 self.log.error("registration::queue not specified", exc_info=True)
847 return
847 return
848 heart = content.get('heartbeat', None)
848 heart = content.get('heartbeat', None)
849 """register a new engine, and create the socket(s) necessary"""
849 """register a new engine, and create the socket(s) necessary"""
850 eid = self._next_id
850 eid = self._next_id
851 # print (eid, queue, reg, heart)
851 # print (eid, queue, reg, heart)
852
852
853 self.log.debug("registration::register_engine(%i, %r, %r, %r)"%(eid, queue, reg, heart))
853 self.log.debug("registration::register_engine(%i, %r, %r, %r)"%(eid, queue, reg, heart))
854
854
855 content = dict(id=eid,status='ok')
855 content = dict(id=eid,status='ok')
856 content.update(self.engine_info)
856 content.update(self.engine_info)
857 # check if requesting available IDs:
857 # check if requesting available IDs:
858 if queue in self.by_ident:
858 if queue in self.by_ident:
859 try:
859 try:
860 raise KeyError("queue_id %r in use"%queue)
860 raise KeyError("queue_id %r in use"%queue)
861 except:
861 except:
862 content = error.wrap_exception()
862 content = error.wrap_exception()
863 self.log.error("queue_id %r in use"%queue, exc_info=True)
863 self.log.error("queue_id %r in use"%queue, exc_info=True)
864 elif heart in self.hearts: # need to check unique hearts?
864 elif heart in self.hearts: # need to check unique hearts?
865 try:
865 try:
866 raise KeyError("heart_id %r in use"%heart)
866 raise KeyError("heart_id %r in use"%heart)
867 except:
867 except:
868 self.log.error("heart_id %r in use"%heart, exc_info=True)
868 self.log.error("heart_id %r in use"%heart, exc_info=True)
869 content = error.wrap_exception()
869 content = error.wrap_exception()
870 else:
870 else:
871 for h, pack in self.incoming_registrations.iteritems():
871 for h, pack in self.incoming_registrations.iteritems():
872 if heart == h:
872 if heart == h:
873 try:
873 try:
874 raise KeyError("heart_id %r in use"%heart)
874 raise KeyError("heart_id %r in use"%heart)
875 except:
875 except:
876 self.log.error("heart_id %r in use"%heart, exc_info=True)
876 self.log.error("heart_id %r in use"%heart, exc_info=True)
877 content = error.wrap_exception()
877 content = error.wrap_exception()
878 break
878 break
879 elif queue == pack[1]:
879 elif queue == pack[1]:
880 try:
880 try:
881 raise KeyError("queue_id %r in use"%queue)
881 raise KeyError("queue_id %r in use"%queue)
882 except:
882 except:
883 self.log.error("queue_id %r in use"%queue, exc_info=True)
883 self.log.error("queue_id %r in use"%queue, exc_info=True)
884 content = error.wrap_exception()
884 content = error.wrap_exception()
885 break
885 break
886
886
887 msg = self.session.send(self.query, "registration_reply",
887 msg = self.session.send(self.query, "registration_reply",
888 content=content,
888 content=content,
889 ident=reg)
889 ident=reg)
890
890
891 if content['status'] == 'ok':
891 if content['status'] == 'ok':
892 if heart in self.heartmonitor.hearts:
892 if heart in self.heartmonitor.hearts:
893 # already beating
893 # already beating
894 self.incoming_registrations[heart] = (eid,queue,reg[0],None)
894 self.incoming_registrations[heart] = (eid,queue,reg[0],None)
895 self.finish_registration(heart)
895 self.finish_registration(heart)
896 else:
896 else:
897 purge = lambda : self._purge_stalled_registration(heart)
897 purge = lambda : self._purge_stalled_registration(heart)
898 dc = ioloop.DelayedCallback(purge, self.registration_timeout, self.loop)
898 dc = ioloop.DelayedCallback(purge, self.registration_timeout, self.loop)
899 dc.start()
899 dc.start()
900 self.incoming_registrations[heart] = (eid,queue,reg[0],dc)
900 self.incoming_registrations[heart] = (eid,queue,reg[0],dc)
901 else:
901 else:
902 self.log.error("registration::registration %i failed: %r"%(eid, content['evalue']))
902 self.log.error("registration::registration %i failed: %r"%(eid, content['evalue']))
903 return eid
903 return eid
904
904
905 def unregister_engine(self, ident, msg):
905 def unregister_engine(self, ident, msg):
906 """Unregister an engine that explicitly requested to leave."""
906 """Unregister an engine that explicitly requested to leave."""
907 try:
907 try:
908 eid = msg['content']['id']
908 eid = msg['content']['id']
909 except:
909 except:
910 self.log.error("registration::bad engine id for unregistration: %r"%ident, exc_info=True)
910 self.log.error("registration::bad engine id for unregistration: %r"%ident, exc_info=True)
911 return
911 return
912 self.log.info("registration::unregister_engine(%r)"%eid)
912 self.log.info("registration::unregister_engine(%r)"%eid)
913 # print (eid)
913 # print (eid)
914 uuid = self.keytable[eid]
914 uuid = self.keytable[eid]
915 content=dict(id=eid, queue=uuid)
915 content=dict(id=eid, queue=uuid)
916 self.dead_engines.add(uuid)
916 self.dead_engines.add(uuid)
917 # self.ids.remove(eid)
917 # self.ids.remove(eid)
918 # uuid = self.keytable.pop(eid)
918 # uuid = self.keytable.pop(eid)
919 #
919 #
920 # ec = self.engines.pop(eid)
920 # ec = self.engines.pop(eid)
921 # self.hearts.pop(ec.heartbeat)
921 # self.hearts.pop(ec.heartbeat)
922 # self.by_ident.pop(ec.queue)
922 # self.by_ident.pop(ec.queue)
923 # self.completed.pop(eid)
923 # self.completed.pop(eid)
924 handleit = lambda : self._handle_stranded_msgs(eid, uuid)
924 handleit = lambda : self._handle_stranded_msgs(eid, uuid)
925 dc = ioloop.DelayedCallback(handleit, self.registration_timeout, self.loop)
925 dc = ioloop.DelayedCallback(handleit, self.registration_timeout, self.loop)
926 dc.start()
926 dc.start()
927 ############## TODO: HANDLE IT ################
927 ############## TODO: HANDLE IT ################
928
928
929 if self.notifier:
929 if self.notifier:
930 self.session.send(self.notifier, "unregistration_notification", content=content)
930 self.session.send(self.notifier, "unregistration_notification", content=content)
931
931
932 def _handle_stranded_msgs(self, eid, uuid):
932 def _handle_stranded_msgs(self, eid, uuid):
933 """Handle messages known to be on an engine when the engine unregisters.
933 """Handle messages known to be on an engine when the engine unregisters.
934
934
935 It is possible that this will fire prematurely - that is, an engine will
935 It is possible that this will fire prematurely - that is, an engine will
936 go down after completing a result, and the client will be notified
936 go down after completing a result, and the client will be notified
937 that the result failed and later receive the actual result.
937 that the result failed and later receive the actual result.
938 """
938 """
939
939
940 outstanding = self.queues[eid]
940 outstanding = self.queues[eid]
941
941
942 for msg_id in outstanding:
942 for msg_id in outstanding:
943 self.pending.remove(msg_id)
943 self.pending.remove(msg_id)
944 self.all_completed.add(msg_id)
944 self.all_completed.add(msg_id)
945 try:
945 try:
946 raise error.EngineError("Engine %r died while running task %r"%(eid, msg_id))
946 raise error.EngineError("Engine %r died while running task %r"%(eid, msg_id))
947 except:
947 except:
948 content = error.wrap_exception()
948 content = error.wrap_exception()
949 # build a fake header:
949 # build a fake header:
950 header = {}
950 header = {}
951 header['engine'] = uuid
951 header['engine'] = uuid
952 header['date'] = datetime.now()
952 header['date'] = datetime.now()
953 rec = dict(result_content=content, result_header=header, result_buffers=[])
953 rec = dict(result_content=content, result_header=header, result_buffers=[])
954 rec['completed'] = header['date']
954 rec['completed'] = header['date']
955 rec['engine_uuid'] = uuid
955 rec['engine_uuid'] = uuid
956 try:
956 try:
957 self.db.update_record(msg_id, rec)
957 self.db.update_record(msg_id, rec)
958 except Exception:
958 except Exception:
959 self.log.error("DB Error handling stranded msg %r"%msg_id, exc_info=True)
959 self.log.error("DB Error handling stranded msg %r"%msg_id, exc_info=True)
960
960
961
961
962 def finish_registration(self, heart):
962 def finish_registration(self, heart):
963 """Second half of engine registration, called after our HeartMonitor
963 """Second half of engine registration, called after our HeartMonitor
964 has received a beat from the Engine's Heart."""
964 has received a beat from the Engine's Heart."""
965 try:
965 try:
966 (eid,queue,reg,purge) = self.incoming_registrations.pop(heart)
966 (eid,queue,reg,purge) = self.incoming_registrations.pop(heart)
967 except KeyError:
967 except KeyError:
968 self.log.error("registration::tried to finish nonexistant registration", exc_info=True)
968 self.log.error("registration::tried to finish nonexistant registration", exc_info=True)
969 return
969 return
970 self.log.info("registration::finished registering engine %i:%r"%(eid,queue))
970 self.log.info("registration::finished registering engine %i:%r"%(eid,queue))
971 if purge is not None:
971 if purge is not None:
972 purge.stop()
972 purge.stop()
973 control = queue
973 control = queue
974 self.ids.add(eid)
974 self.ids.add(eid)
975 self.keytable[eid] = queue
975 self.keytable[eid] = queue
976 self.engines[eid] = EngineConnector(id=eid, queue=queue, registration=reg,
976 self.engines[eid] = EngineConnector(id=eid, queue=queue, registration=reg,
977 control=control, heartbeat=heart)
977 control=control, heartbeat=heart)
978 self.by_ident[queue] = eid
978 self.by_ident[queue] = eid
979 self.queues[eid] = list()
979 self.queues[eid] = list()
980 self.tasks[eid] = list()
980 self.tasks[eid] = list()
981 self.completed[eid] = list()
981 self.completed[eid] = list()
982 self.hearts[heart] = eid
982 self.hearts[heart] = eid
983 content = dict(id=eid, queue=self.engines[eid].queue)
983 content = dict(id=eid, queue=self.engines[eid].queue)
984 if self.notifier:
984 if self.notifier:
985 self.session.send(self.notifier, "registration_notification", content=content)
985 self.session.send(self.notifier, "registration_notification", content=content)
986 self.log.info("engine::Engine Connected: %i"%eid)
986 self.log.info("engine::Engine Connected: %i"%eid)
987
987
988 def _purge_stalled_registration(self, heart):
988 def _purge_stalled_registration(self, heart):
989 if heart in self.incoming_registrations:
989 if heart in self.incoming_registrations:
990 eid = self.incoming_registrations.pop(heart)[0]
990 eid = self.incoming_registrations.pop(heart)[0]
991 self.log.info("registration::purging stalled registration: %i"%eid)
991 self.log.info("registration::purging stalled registration: %i"%eid)
992 else:
992 else:
993 pass
993 pass
994
994
995 #-------------------------------------------------------------------------
995 #-------------------------------------------------------------------------
996 # Client Requests
996 # Client Requests
997 #-------------------------------------------------------------------------
997 #-------------------------------------------------------------------------
998
998
999 def shutdown_request(self, client_id, msg):
999 def shutdown_request(self, client_id, msg):
1000 """handle shutdown request."""
1000 """handle shutdown request."""
1001 self.session.send(self.query, 'shutdown_reply', content={'status': 'ok'}, ident=client_id)
1001 self.session.send(self.query, 'shutdown_reply', content={'status': 'ok'}, ident=client_id)
1002 # also notify other clients of shutdown
1002 # also notify other clients of shutdown
1003 self.session.send(self.notifier, 'shutdown_notice', content={'status': 'ok'})
1003 self.session.send(self.notifier, 'shutdown_notice', content={'status': 'ok'})
1004 dc = ioloop.DelayedCallback(lambda : self._shutdown(), 1000, self.loop)
1004 dc = ioloop.DelayedCallback(lambda : self._shutdown(), 1000, self.loop)
1005 dc.start()
1005 dc.start()
1006
1006
1007 def _shutdown(self):
1007 def _shutdown(self):
1008 self.log.info("hub::hub shutting down.")
1008 self.log.info("hub::hub shutting down.")
1009 time.sleep(0.1)
1009 time.sleep(0.1)
1010 sys.exit(0)
1010 sys.exit(0)
1011
1011
1012
1012
1013 def check_load(self, client_id, msg):
1013 def check_load(self, client_id, msg):
1014 content = msg['content']
1014 content = msg['content']
1015 try:
1015 try:
1016 targets = content['targets']
1016 targets = content['targets']
1017 targets = self._validate_targets(targets)
1017 targets = self._validate_targets(targets)
1018 except:
1018 except:
1019 content = error.wrap_exception()
1019 content = error.wrap_exception()
1020 self.session.send(self.query, "hub_error",
1020 self.session.send(self.query, "hub_error",
1021 content=content, ident=client_id)
1021 content=content, ident=client_id)
1022 return
1022 return
1023
1023
1024 content = dict(status='ok')
1024 content = dict(status='ok')
1025 # loads = {}
1025 # loads = {}
1026 for t in targets:
1026 for t in targets:
1027 content[bytes(t)] = len(self.queues[t])+len(self.tasks[t])
1027 content[bytes(t)] = len(self.queues[t])+len(self.tasks[t])
1028 self.session.send(self.query, "load_reply", content=content, ident=client_id)
1028 self.session.send(self.query, "load_reply", content=content, ident=client_id)
1029
1029
1030
1030
1031 def queue_status(self, client_id, msg):
1031 def queue_status(self, client_id, msg):
1032 """Return the Queue status of one or more targets.
1032 """Return the Queue status of one or more targets.
1033 if verbose: return the msg_ids
1033 if verbose: return the msg_ids
1034 else: return len of each type.
1034 else: return len of each type.
1035 keys: queue (pending MUX jobs)
1035 keys: queue (pending MUX jobs)
1036 tasks (pending Task jobs)
1036 tasks (pending Task jobs)
1037 completed (finished jobs from both queues)"""
1037 completed (finished jobs from both queues)"""
1038 content = msg['content']
1038 content = msg['content']
1039 targets = content['targets']
1039 targets = content['targets']
1040 try:
1040 try:
1041 targets = self._validate_targets(targets)
1041 targets = self._validate_targets(targets)
1042 except:
1042 except:
1043 content = error.wrap_exception()
1043 content = error.wrap_exception()
1044 self.session.send(self.query, "hub_error",
1044 self.session.send(self.query, "hub_error",
1045 content=content, ident=client_id)
1045 content=content, ident=client_id)
1046 return
1046 return
1047 verbose = content.get('verbose', False)
1047 verbose = content.get('verbose', False)
1048 content = dict(status='ok')
1048 content = dict(status='ok')
1049 for t in targets:
1049 for t in targets:
1050 queue = self.queues[t]
1050 queue = self.queues[t]
1051 completed = self.completed[t]
1051 completed = self.completed[t]
1052 tasks = self.tasks[t]
1052 tasks = self.tasks[t]
1053 if not verbose:
1053 if not verbose:
1054 queue = len(queue)
1054 queue = len(queue)
1055 completed = len(completed)
1055 completed = len(completed)
1056 tasks = len(tasks)
1056 tasks = len(tasks)
1057 content[bytes(t)] = {'queue': queue, 'completed': completed , 'tasks': tasks}
1057 content[bytes(t)] = {'queue': queue, 'completed': completed , 'tasks': tasks}
1058 content['unassigned'] = list(self.unassigned) if verbose else len(self.unassigned)
1058 content['unassigned'] = list(self.unassigned) if verbose else len(self.unassigned)
1059
1059
1060 self.session.send(self.query, "queue_reply", content=content, ident=client_id)
1060 self.session.send(self.query, "queue_reply", content=content, ident=client_id)
1061
1061
1062 def purge_results(self, client_id, msg):
1062 def purge_results(self, client_id, msg):
1063 """Purge results from memory. This method is more valuable before we move
1063 """Purge results from memory. This method is more valuable before we move
1064 to a DB based message storage mechanism."""
1064 to a DB based message storage mechanism."""
1065 content = msg['content']
1065 content = msg['content']
1066 msg_ids = content.get('msg_ids', [])
1066 msg_ids = content.get('msg_ids', [])
1067 reply = dict(status='ok')
1067 reply = dict(status='ok')
1068 if msg_ids == 'all':
1068 if msg_ids == 'all':
1069 try:
1069 try:
1070 self.db.drop_matching_records(dict(completed={'$ne':None}))
1070 self.db.drop_matching_records(dict(completed={'$ne':None}))
1071 except Exception:
1071 except Exception:
1072 reply = error.wrap_exception()
1072 reply = error.wrap_exception()
1073 else:
1073 else:
1074 pending = filter(lambda m: m in self.pending, msg_ids)
1074 pending = filter(lambda m: m in self.pending, msg_ids)
1075 if pending:
1075 if pending:
1076 try:
1076 try:
1077 raise IndexError("msg pending: %r"%pending[0])
1077 raise IndexError("msg pending: %r"%pending[0])
1078 except:
1078 except:
1079 reply = error.wrap_exception()
1079 reply = error.wrap_exception()
1080 else:
1080 else:
1081 try:
1081 try:
1082 self.db.drop_matching_records(dict(msg_id={'$in':msg_ids}))
1082 self.db.drop_matching_records(dict(msg_id={'$in':msg_ids}))
1083 except Exception:
1083 except Exception:
1084 reply = error.wrap_exception()
1084 reply = error.wrap_exception()
1085
1085
1086 if reply['status'] == 'ok':
1086 if reply['status'] == 'ok':
1087 eids = content.get('engine_ids', [])
1087 eids = content.get('engine_ids', [])
1088 for eid in eids:
1088 for eid in eids:
1089 if eid not in self.engines:
1089 if eid not in self.engines:
1090 try:
1090 try:
1091 raise IndexError("No such engine: %i"%eid)
1091 raise IndexError("No such engine: %i"%eid)
1092 except:
1092 except:
1093 reply = error.wrap_exception()
1093 reply = error.wrap_exception()
1094 break
1094 break
1095 msg_ids = self.completed.pop(eid)
1095 msg_ids = self.completed.pop(eid)
1096 uid = self.engines[eid].queue
1096 uid = self.engines[eid].queue
1097 try:
1097 try:
1098 self.db.drop_matching_records(dict(engine_uuid=uid, completed={'$ne':None}))
1098 self.db.drop_matching_records(dict(engine_uuid=uid, completed={'$ne':None}))
1099 except Exception:
1099 except Exception:
1100 reply = error.wrap_exception()
1100 reply = error.wrap_exception()
1101 break
1101 break
1102
1102
1103 self.session.send(self.query, 'purge_reply', content=reply, ident=client_id)
1103 self.session.send(self.query, 'purge_reply', content=reply, ident=client_id)
1104
1104
1105 def resubmit_task(self, client_id, msg):
1105 def resubmit_task(self, client_id, msg):
1106 """Resubmit one or more tasks."""
1106 """Resubmit one or more tasks."""
1107 def finish(reply):
1107 def finish(reply):
1108 self.session.send(self.query, 'resubmit_reply', content=reply, ident=client_id)
1108 self.session.send(self.query, 'resubmit_reply', content=reply, ident=client_id)
1109
1109
1110 content = msg['content']
1110 content = msg['content']
1111 msg_ids = content['msg_ids']
1111 msg_ids = content['msg_ids']
1112 reply = dict(status='ok')
1112 reply = dict(status='ok')
1113 try:
1113 try:
1114 records = self.db.find_records({'msg_id' : {'$in' : msg_ids}}, keys=[
1114 records = self.db.find_records({'msg_id' : {'$in' : msg_ids}}, keys=[
1115 'header', 'content', 'buffers'])
1115 'header', 'content', 'buffers'])
1116 except Exception:
1116 except Exception:
1117 self.log.error('db::db error finding tasks to resubmit', exc_info=True)
1117 self.log.error('db::db error finding tasks to resubmit', exc_info=True)
1118 return finish(error.wrap_exception())
1118 return finish(error.wrap_exception())
1119
1119
1120 # validate msg_ids
1120 # validate msg_ids
1121 found_ids = [ rec['msg_id'] for rec in records ]
1121 found_ids = [ rec['msg_id'] for rec in records ]
1122 invalid_ids = filter(lambda m: m in self.pending, found_ids)
1122 invalid_ids = filter(lambda m: m in self.pending, found_ids)
1123 if len(records) > len(msg_ids):
1123 if len(records) > len(msg_ids):
1124 try:
1124 try:
1125 raise RuntimeError("DB appears to be in an inconsistent state."
1125 raise RuntimeError("DB appears to be in an inconsistent state."
1126 "More matching records were found than should exist")
1126 "More matching records were found than should exist")
1127 except Exception:
1127 except Exception:
1128 return finish(error.wrap_exception())
1128 return finish(error.wrap_exception())
1129 elif len(records) < len(msg_ids):
1129 elif len(records) < len(msg_ids):
1130 missing = [ m for m in msg_ids if m not in found_ids ]
1130 missing = [ m for m in msg_ids if m not in found_ids ]
1131 try:
1131 try:
1132 raise KeyError("No such msg(s): %r"%missing)
1132 raise KeyError("No such msg(s): %r"%missing)
1133 except KeyError:
1133 except KeyError:
1134 return finish(error.wrap_exception())
1134 return finish(error.wrap_exception())
1135 elif invalid_ids:
1135 elif invalid_ids:
1136 msg_id = invalid_ids[0]
1136 msg_id = invalid_ids[0]
1137 try:
1137 try:
1138 raise ValueError("Task %r appears to be inflight"%(msg_id))
1138 raise ValueError("Task %r appears to be inflight"%(msg_id))
1139 except Exception:
1139 except Exception:
1140 return finish(error.wrap_exception())
1140 return finish(error.wrap_exception())
1141
1141
1142 # clear the existing records
1142 # clear the existing records
1143 now = datetime.now()
1143 now = datetime.now()
1144 rec = empty_record()
1144 rec = empty_record()
1145 map(rec.pop, ['msg_id', 'header', 'content', 'buffers', 'submitted'])
1145 map(rec.pop, ['msg_id', 'header', 'content', 'buffers', 'submitted'])
1146 rec['resubmitted'] = now
1146 rec['resubmitted'] = now
1147 rec['queue'] = 'task'
1147 rec['queue'] = 'task'
1148 rec['client_uuid'] = client_id[0]
1148 rec['client_uuid'] = client_id[0]
1149 try:
1149 try:
1150 for msg_id in msg_ids:
1150 for msg_id in msg_ids:
1151 self.all_completed.discard(msg_id)
1151 self.all_completed.discard(msg_id)
1152 self.db.update_record(msg_id, rec)
1152 self.db.update_record(msg_id, rec)
1153 except Exception:
1153 except Exception:
1154 self.log.error('db::db error upating record', exc_info=True)
1154 self.log.error('db::db error upating record', exc_info=True)
1155 reply = error.wrap_exception()
1155 reply = error.wrap_exception()
1156 else:
1156 else:
1157 # send the messages
1157 # send the messages
1158 for rec in records:
1158 for rec in records:
1159 header = rec['header']
1159 header = rec['header']
1160 # include resubmitted in header to prevent digest collision
1160 # include resubmitted in header to prevent digest collision
1161 header['resubmitted'] = now
1161 header['resubmitted'] = now
1162 msg = self.session.msg(header['msg_type'])
1162 msg = self.session.msg(header['msg_type'])
1163 msg['content'] = rec['content']
1163 msg['content'] = rec['content']
1164 msg['header'] = header
1164 msg['header'] = header
1165 msg['msg_id'] = rec['msg_id']
1165 msg['msg_id'] = rec['msg_id']
1166 self.session.send(self.resubmit, msg, buffers=rec['buffers'])
1166 self.session.send(self.resubmit, msg, buffers=rec['buffers'])
1167
1167
1168 finish(dict(status='ok'))
1168 finish(dict(status='ok'))
1169
1169
1170
1170
1171 def _extract_record(self, rec):
1171 def _extract_record(self, rec):
1172 """decompose a TaskRecord dict into subsection of reply for get_result"""
1172 """decompose a TaskRecord dict into subsection of reply for get_result"""
1173 io_dict = {}
1173 io_dict = {}
1174 for key in 'pyin pyout pyerr stdout stderr'.split():
1174 for key in 'pyin pyout pyerr stdout stderr'.split():
1175 io_dict[key] = rec[key]
1175 io_dict[key] = rec[key]
1176 content = { 'result_content': rec['result_content'],
1176 content = { 'result_content': rec['result_content'],
1177 'header': rec['header'],
1177 'header': rec['header'],
1178 'result_header' : rec['result_header'],
1178 'result_header' : rec['result_header'],
1179 'io' : io_dict,
1179 'io' : io_dict,
1180 }
1180 }
1181 if rec['result_buffers']:
1181 if rec['result_buffers']:
1182 buffers = map(str, rec['result_buffers'])
1182 buffers = map(str, rec['result_buffers'])
1183 else:
1183 else:
1184 buffers = []
1184 buffers = []
1185
1185
1186 return content, buffers
1186 return content, buffers
1187
1187
1188 def get_results(self, client_id, msg):
1188 def get_results(self, client_id, msg):
1189 """Get the result of 1 or more messages."""
1189 """Get the result of 1 or more messages."""
1190 content = msg['content']
1190 content = msg['content']
1191 msg_ids = sorted(set(content['msg_ids']))
1191 msg_ids = sorted(set(content['msg_ids']))
1192 statusonly = content.get('status_only', False)
1192 statusonly = content.get('status_only', False)
1193 pending = []
1193 pending = []
1194 completed = []
1194 completed = []
1195 content = dict(status='ok')
1195 content = dict(status='ok')
1196 content['pending'] = pending
1196 content['pending'] = pending
1197 content['completed'] = completed
1197 content['completed'] = completed
1198 buffers = []
1198 buffers = []
1199 if not statusonly:
1199 if not statusonly:
1200 try:
1200 try:
1201 matches = self.db.find_records(dict(msg_id={'$in':msg_ids}))
1201 matches = self.db.find_records(dict(msg_id={'$in':msg_ids}))
1202 # turn match list into dict, for faster lookup
1202 # turn match list into dict, for faster lookup
1203 records = {}
1203 records = {}
1204 for rec in matches:
1204 for rec in matches:
1205 records[rec['msg_id']] = rec
1205 records[rec['msg_id']] = rec
1206 except Exception:
1206 except Exception:
1207 content = error.wrap_exception()
1207 content = error.wrap_exception()
1208 self.session.send(self.query, "result_reply", content=content,
1208 self.session.send(self.query, "result_reply", content=content,
1209 parent=msg, ident=client_id)
1209 parent=msg, ident=client_id)
1210 return
1210 return
1211 else:
1211 else:
1212 records = {}
1212 records = {}
1213 for msg_id in msg_ids:
1213 for msg_id in msg_ids:
1214 if msg_id in self.pending:
1214 if msg_id in self.pending:
1215 pending.append(msg_id)
1215 pending.append(msg_id)
1216 elif msg_id in self.all_completed:
1216 elif msg_id in self.all_completed:
1217 completed.append(msg_id)
1217 completed.append(msg_id)
1218 if not statusonly:
1218 if not statusonly:
1219 c,bufs = self._extract_record(records[msg_id])
1219 c,bufs = self._extract_record(records[msg_id])
1220 content[msg_id] = c
1220 content[msg_id] = c
1221 buffers.extend(bufs)
1221 buffers.extend(bufs)
1222 elif msg_id in records:
1222 elif msg_id in records:
1223 if rec['completed']:
1223 if rec['completed']:
1224 completed.append(msg_id)
1224 completed.append(msg_id)
1225 c,bufs = self._extract_record(records[msg_id])
1225 c,bufs = self._extract_record(records[msg_id])
1226 content[msg_id] = c
1226 content[msg_id] = c
1227 buffers.extend(bufs)
1227 buffers.extend(bufs)
1228 else:
1228 else:
1229 pending.append(msg_id)
1229 pending.append(msg_id)
1230 else:
1230 else:
1231 try:
1231 try:
1232 raise KeyError('No such message: '+msg_id)
1232 raise KeyError('No such message: '+msg_id)
1233 except:
1233 except:
1234 content = error.wrap_exception()
1234 content = error.wrap_exception()
1235 break
1235 break
1236 self.session.send(self.query, "result_reply", content=content,
1236 self.session.send(self.query, "result_reply", content=content,
1237 parent=msg, ident=client_id,
1237 parent=msg, ident=client_id,
1238 buffers=buffers)
1238 buffers=buffers)
1239
1239
1240 def get_history(self, client_id, msg):
1240 def get_history(self, client_id, msg):
1241 """Get a list of all msg_ids in our DB records"""
1241 """Get a list of all msg_ids in our DB records"""
1242 try:
1242 try:
1243 msg_ids = self.db.get_history()
1243 msg_ids = self.db.get_history()
1244 except Exception as e:
1244 except Exception as e:
1245 content = error.wrap_exception()
1245 content = error.wrap_exception()
1246 else:
1246 else:
1247 content = dict(status='ok', history=msg_ids)
1247 content = dict(status='ok', history=msg_ids)
1248
1248
1249 self.session.send(self.query, "history_reply", content=content,
1249 self.session.send(self.query, "history_reply", content=content,
1250 parent=msg, ident=client_id)
1250 parent=msg, ident=client_id)
1251
1251
1252 def db_query(self, client_id, msg):
1252 def db_query(self, client_id, msg):
1253 """Perform a raw query on the task record database."""
1253 """Perform a raw query on the task record database."""
1254 content = msg['content']
1254 content = msg['content']
1255 query = content.get('query', {})
1255 query = content.get('query', {})
1256 keys = content.get('keys', None)
1256 keys = content.get('keys', None)
1257 buffers = []
1257 buffers = []
1258 empty = list()
1258 empty = list()
1259 try:
1259 try:
1260 records = self.db.find_records(query, keys)
1260 records = self.db.find_records(query, keys)
1261 except Exception as e:
1261 except Exception as e:
1262 content = error.wrap_exception()
1262 content = error.wrap_exception()
1263 else:
1263 else:
1264 # extract buffers from reply content:
1264 # extract buffers from reply content:
1265 if keys is not None:
1265 if keys is not None:
1266 buffer_lens = [] if 'buffers' in keys else None
1266 buffer_lens = [] if 'buffers' in keys else None
1267 result_buffer_lens = [] if 'result_buffers' in keys else None
1267 result_buffer_lens = [] if 'result_buffers' in keys else None
1268 else:
1268 else:
1269 buffer_lens = []
1269 buffer_lens = []
1270 result_buffer_lens = []
1270 result_buffer_lens = []
1271
1271
1272 for rec in records:
1272 for rec in records:
1273 # buffers may be None, so double check
1273 # buffers may be None, so double check
1274 if buffer_lens is not None:
1274 if buffer_lens is not None:
1275 b = rec.pop('buffers', empty) or empty
1275 b = rec.pop('buffers', empty) or empty
1276 buffer_lens.append(len(b))
1276 buffer_lens.append(len(b))
1277 buffers.extend(b)
1277 buffers.extend(b)
1278 if result_buffer_lens is not None:
1278 if result_buffer_lens is not None:
1279 rb = rec.pop('result_buffers', empty) or empty
1279 rb = rec.pop('result_buffers', empty) or empty
1280 result_buffer_lens.append(len(rb))
1280 result_buffer_lens.append(len(rb))
1281 buffers.extend(rb)
1281 buffers.extend(rb)
1282 content = dict(status='ok', records=records, buffer_lens=buffer_lens,
1282 content = dict(status='ok', records=records, buffer_lens=buffer_lens,
1283 result_buffer_lens=result_buffer_lens)
1283 result_buffer_lens=result_buffer_lens)
1284
1284
1285 self.session.send(self.query, "db_reply", content=content,
1285 self.session.send(self.query, "db_reply", content=content,
1286 parent=msg, ident=client_id,
1286 parent=msg, ident=client_id,
1287 buffers=buffers)
1287 buffers=buffers)
1288
1288
@@ -1,215 +1,216 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 """An Application for launching a kernel
2 """An Application for launching a kernel
3
3
4 Authors
4 Authors
5 -------
5 -------
6 * MinRK
6 * MinRK
7 """
7 """
8 #-----------------------------------------------------------------------------
8 #-----------------------------------------------------------------------------
9 # Copyright (C) 2011 The IPython Development Team
9 # Copyright (C) 2011 The IPython Development Team
10 #
10 #
11 # Distributed under the terms of the BSD License. The full license is in
11 # Distributed under the terms of the BSD License. The full license is in
12 # the file COPYING.txt, distributed as part of this software.
12 # the file COPYING.txt, distributed as part of this software.
13 #-----------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14
14
15 #-----------------------------------------------------------------------------
15 #-----------------------------------------------------------------------------
16 # Imports
16 # Imports
17 #-----------------------------------------------------------------------------
17 #-----------------------------------------------------------------------------
18
18
19 # Standard library imports.
19 # Standard library imports.
20 import os
20 import os
21 import sys
21 import sys
22
22
23 # System library imports.
23 # System library imports.
24 import zmq
24 import zmq
25
25
26 # IPython imports.
26 # IPython imports.
27 from IPython.core.ultratb import FormattedTB
27 from IPython.core.ultratb import FormattedTB
28 from IPython.core.application import (
28 from IPython.core.application import (
29 BaseIPythonApplication, base_flags, base_aliases
29 BaseIPythonApplication, base_flags, base_aliases
30 )
30 )
31 from IPython.utils import io
31 from IPython.utils import io
32 from IPython.utils.localinterfaces import LOCALHOST
32 from IPython.utils.localinterfaces import LOCALHOST
33 from IPython.utils.traitlets import Any, Instance, Dict, Unicode, Int, Bool
33 from IPython.utils.traitlets import (Any, Instance, Dict, Unicode, Int, Bool,
34 DottedObjectName)
34 from IPython.utils.importstring import import_item
35 from IPython.utils.importstring import import_item
35 # local imports
36 # local imports
36 from IPython.zmq.heartbeat import Heartbeat
37 from IPython.zmq.heartbeat import Heartbeat
37 from IPython.zmq.parentpoller import ParentPollerUnix, ParentPollerWindows
38 from IPython.zmq.parentpoller import ParentPollerUnix, ParentPollerWindows
38 from IPython.zmq.session import Session
39 from IPython.zmq.session import Session
39
40
40
41
41 #-----------------------------------------------------------------------------
42 #-----------------------------------------------------------------------------
42 # Flags and Aliases
43 # Flags and Aliases
43 #-----------------------------------------------------------------------------
44 #-----------------------------------------------------------------------------
44
45
45 kernel_aliases = dict(base_aliases)
46 kernel_aliases = dict(base_aliases)
46 kernel_aliases.update({
47 kernel_aliases.update({
47 'ip' : 'KernelApp.ip',
48 'ip' : 'KernelApp.ip',
48 'hb' : 'KernelApp.hb_port',
49 'hb' : 'KernelApp.hb_port',
49 'shell' : 'KernelApp.shell_port',
50 'shell' : 'KernelApp.shell_port',
50 'iopub' : 'KernelApp.iopub_port',
51 'iopub' : 'KernelApp.iopub_port',
51 'stdin' : 'KernelApp.stdin_port',
52 'stdin' : 'KernelApp.stdin_port',
52 'parent': 'KernelApp.parent',
53 'parent': 'KernelApp.parent',
53 })
54 })
54 if sys.platform.startswith('win'):
55 if sys.platform.startswith('win'):
55 kernel_aliases['interrupt'] = 'KernelApp.interrupt'
56 kernel_aliases['interrupt'] = 'KernelApp.interrupt'
56
57
57 kernel_flags = dict(base_flags)
58 kernel_flags = dict(base_flags)
58 kernel_flags.update({
59 kernel_flags.update({
59 'no-stdout' : (
60 'no-stdout' : (
60 {'KernelApp' : {'no_stdout' : True}},
61 {'KernelApp' : {'no_stdout' : True}},
61 "redirect stdout to the null device"),
62 "redirect stdout to the null device"),
62 'no-stderr' : (
63 'no-stderr' : (
63 {'KernelApp' : {'no_stderr' : True}},
64 {'KernelApp' : {'no_stderr' : True}},
64 "redirect stderr to the null device"),
65 "redirect stderr to the null device"),
65 })
66 })
66
67
67
68
68 #-----------------------------------------------------------------------------
69 #-----------------------------------------------------------------------------
69 # Application class for starting a Kernel
70 # Application class for starting a Kernel
70 #-----------------------------------------------------------------------------
71 #-----------------------------------------------------------------------------
71
72
72 class KernelApp(BaseIPythonApplication):
73 class KernelApp(BaseIPythonApplication):
73 name='pykernel'
74 name='pykernel'
74 aliases = Dict(kernel_aliases)
75 aliases = Dict(kernel_aliases)
75 flags = Dict(kernel_flags)
76 flags = Dict(kernel_flags)
76 classes = [Session]
77 classes = [Session]
77 # the kernel class, as an importstring
78 # the kernel class, as an importstring
78 kernel_class = Unicode('IPython.zmq.pykernel.Kernel')
79 kernel_class = DottedObjectName('IPython.zmq.pykernel.Kernel')
79 kernel = Any()
80 kernel = Any()
80 poller = Any() # don't restrict this even though current pollers are all Threads
81 poller = Any() # don't restrict this even though current pollers are all Threads
81 heartbeat = Instance(Heartbeat)
82 heartbeat = Instance(Heartbeat)
82 session = Instance('IPython.zmq.session.Session')
83 session = Instance('IPython.zmq.session.Session')
83 ports = Dict()
84 ports = Dict()
84
85
85 # connection info:
86 # connection info:
86 ip = Unicode(LOCALHOST, config=True,
87 ip = Unicode(LOCALHOST, config=True,
87 help="Set the IP or interface on which the kernel will listen.")
88 help="Set the IP or interface on which the kernel will listen.")
88 hb_port = Int(0, config=True, help="set the heartbeat port [default: random]")
89 hb_port = Int(0, config=True, help="set the heartbeat port [default: random]")
89 shell_port = Int(0, config=True, help="set the shell (XREP) port [default: random]")
90 shell_port = Int(0, config=True, help="set the shell (XREP) port [default: random]")
90 iopub_port = Int(0, config=True, help="set the iopub (PUB) port [default: random]")
91 iopub_port = Int(0, config=True, help="set the iopub (PUB) port [default: random]")
91 stdin_port = Int(0, config=True, help="set the stdin (XREQ) port [default: random]")
92 stdin_port = Int(0, config=True, help="set the stdin (XREQ) port [default: random]")
92
93
93 # streams, etc.
94 # streams, etc.
94 no_stdout = Bool(False, config=True, help="redirect stdout to the null device")
95 no_stdout = Bool(False, config=True, help="redirect stdout to the null device")
95 no_stderr = Bool(False, config=True, help="redirect stderr to the null device")
96 no_stderr = Bool(False, config=True, help="redirect stderr to the null device")
96 outstream_class = Unicode('IPython.zmq.iostream.OutStream', config=True,
97 outstream_class = DottedObjectName('IPython.zmq.iostream.OutStream',
97 help="The importstring for the OutStream factory")
98 config=True, help="The importstring for the OutStream factory")
98 displayhook_class = Unicode('IPython.zmq.displayhook.DisplayHook', config=True,
99 displayhook_class = DottedObjectName('IPython.zmq.displayhook.DisplayHook',
99 help="The importstring for the DisplayHook factory")
100 config=True, help="The importstring for the DisplayHook factory")
100
101
101 # polling
102 # polling
102 parent = Int(0, config=True,
103 parent = Int(0, config=True,
103 help="""kill this process if its parent dies. On Windows, the argument
104 help="""kill this process if its parent dies. On Windows, the argument
104 specifies the HANDLE of the parent process, otherwise it is simply boolean.
105 specifies the HANDLE of the parent process, otherwise it is simply boolean.
105 """)
106 """)
106 interrupt = Int(0, config=True,
107 interrupt = Int(0, config=True,
107 help="""ONLY USED ON WINDOWS
108 help="""ONLY USED ON WINDOWS
108 Interrupt this process when the parent is signalled.
109 Interrupt this process when the parent is signalled.
109 """)
110 """)
110
111
111 def init_crash_handler(self):
112 def init_crash_handler(self):
112 # Install minimal exception handling
113 # Install minimal exception handling
113 sys.excepthook = FormattedTB(mode='Verbose', color_scheme='NoColor',
114 sys.excepthook = FormattedTB(mode='Verbose', color_scheme='NoColor',
114 ostream=sys.__stdout__)
115 ostream=sys.__stdout__)
115
116
116 def init_poller(self):
117 def init_poller(self):
117 if sys.platform == 'win32':
118 if sys.platform == 'win32':
118 if self.interrupt or self.parent:
119 if self.interrupt or self.parent:
119 self.poller = ParentPollerWindows(self.interrupt, self.parent)
120 self.poller = ParentPollerWindows(self.interrupt, self.parent)
120 elif self.parent:
121 elif self.parent:
121 self.poller = ParentPollerUnix()
122 self.poller = ParentPollerUnix()
122
123
123 def _bind_socket(self, s, port):
124 def _bind_socket(self, s, port):
124 iface = 'tcp://%s' % self.ip
125 iface = 'tcp://%s' % self.ip
125 if port <= 0:
126 if port <= 0:
126 port = s.bind_to_random_port(iface)
127 port = s.bind_to_random_port(iface)
127 else:
128 else:
128 s.bind(iface + ':%i'%port)
129 s.bind(iface + ':%i'%port)
129 return port
130 return port
130
131
131 def init_sockets(self):
132 def init_sockets(self):
132 # Create a context, a session, and the kernel sockets.
133 # Create a context, a session, and the kernel sockets.
133 io.raw_print("Starting the kernel at pid:", os.getpid())
134 io.raw_print("Starting the kernel at pid:", os.getpid())
134 context = zmq.Context.instance()
135 context = zmq.Context.instance()
135 # Uncomment this to try closing the context.
136 # Uncomment this to try closing the context.
136 # atexit.register(context.term)
137 # atexit.register(context.term)
137
138
138 self.shell_socket = context.socket(zmq.XREP)
139 self.shell_socket = context.socket(zmq.XREP)
139 self.shell_port = self._bind_socket(self.shell_socket, self.shell_port)
140 self.shell_port = self._bind_socket(self.shell_socket, self.shell_port)
140 self.log.debug("shell XREP Channel on port: %i"%self.shell_port)
141 self.log.debug("shell XREP Channel on port: %i"%self.shell_port)
141
142
142 self.iopub_socket = context.socket(zmq.PUB)
143 self.iopub_socket = context.socket(zmq.PUB)
143 self.iopub_port = self._bind_socket(self.iopub_socket, self.iopub_port)
144 self.iopub_port = self._bind_socket(self.iopub_socket, self.iopub_port)
144 self.log.debug("iopub PUB Channel on port: %i"%self.iopub_port)
145 self.log.debug("iopub PUB Channel on port: %i"%self.iopub_port)
145
146
146 self.stdin_socket = context.socket(zmq.XREQ)
147 self.stdin_socket = context.socket(zmq.XREQ)
147 self.stdin_port = self._bind_socket(self.stdin_socket, self.stdin_port)
148 self.stdin_port = self._bind_socket(self.stdin_socket, self.stdin_port)
148 self.log.debug("stdin XREQ Channel on port: %i"%self.stdin_port)
149 self.log.debug("stdin XREQ Channel on port: %i"%self.stdin_port)
149
150
150 self.heartbeat = Heartbeat(context, (self.ip, self.hb_port))
151 self.heartbeat = Heartbeat(context, (self.ip, self.hb_port))
151 self.hb_port = self.heartbeat.port
152 self.hb_port = self.heartbeat.port
152 self.log.debug("Heartbeat REP Channel on port: %i"%self.hb_port)
153 self.log.debug("Heartbeat REP Channel on port: %i"%self.hb_port)
153
154
154 # Helper to make it easier to connect to an existing kernel, until we have
155 # Helper to make it easier to connect to an existing kernel, until we have
155 # single-port connection negotiation fully implemented.
156 # single-port connection negotiation fully implemented.
156 self.log.info("To connect another client to this kernel, use:")
157 self.log.info("To connect another client to this kernel, use:")
157 self.log.info("--external shell={0} iopub={1} stdin={2} hb={3}".format(
158 self.log.info("--external shell={0} iopub={1} stdin={2} hb={3}".format(
158 self.shell_port, self.iopub_port, self.stdin_port, self.hb_port))
159 self.shell_port, self.iopub_port, self.stdin_port, self.hb_port))
159
160
160
161
161 self.ports = dict(shell=self.shell_port, iopub=self.iopub_port,
162 self.ports = dict(shell=self.shell_port, iopub=self.iopub_port,
162 stdin=self.stdin_port, hb=self.hb_port)
163 stdin=self.stdin_port, hb=self.hb_port)
163
164
164 def init_session(self):
165 def init_session(self):
165 """create our session object"""
166 """create our session object"""
166 self.session = Session(config=self.config, username=u'kernel')
167 self.session = Session(config=self.config, username=u'kernel')
167
168
168 def init_io(self):
169 def init_io(self):
169 """redirects stdout/stderr, and installs a display hook"""
170 """redirects stdout/stderr, and installs a display hook"""
170 # Re-direct stdout/stderr, if necessary.
171 # Re-direct stdout/stderr, if necessary.
171 if self.no_stdout or self.no_stderr:
172 if self.no_stdout or self.no_stderr:
172 blackhole = file(os.devnull, 'w')
173 blackhole = file(os.devnull, 'w')
173 if self.no_stdout:
174 if self.no_stdout:
174 sys.stdout = sys.__stdout__ = blackhole
175 sys.stdout = sys.__stdout__ = blackhole
175 if self.no_stderr:
176 if self.no_stderr:
176 sys.stderr = sys.__stderr__ = blackhole
177 sys.stderr = sys.__stderr__ = blackhole
177
178
178 # Redirect input streams and set a display hook.
179 # Redirect input streams and set a display hook.
179
180
180 if self.outstream_class:
181 if self.outstream_class:
181 outstream_factory = import_item(str(self.outstream_class))
182 outstream_factory = import_item(str(self.outstream_class))
182 sys.stdout = outstream_factory(self.session, self.iopub_socket, u'stdout')
183 sys.stdout = outstream_factory(self.session, self.iopub_socket, u'stdout')
183 sys.stderr = outstream_factory(self.session, self.iopub_socket, u'stderr')
184 sys.stderr = outstream_factory(self.session, self.iopub_socket, u'stderr')
184 if self.displayhook_class:
185 if self.displayhook_class:
185 displayhook_factory = import_item(str(self.displayhook_class))
186 displayhook_factory = import_item(str(self.displayhook_class))
186 sys.displayhook = displayhook_factory(self.session, self.iopub_socket)
187 sys.displayhook = displayhook_factory(self.session, self.iopub_socket)
187
188
188 def init_kernel(self):
189 def init_kernel(self):
189 """Create the Kernel object itself"""
190 """Create the Kernel object itself"""
190 kernel_factory = import_item(str(self.kernel_class))
191 kernel_factory = import_item(str(self.kernel_class))
191 self.kernel = kernel_factory(config=self.config, session=self.session,
192 self.kernel = kernel_factory(config=self.config, session=self.session,
192 shell_socket=self.shell_socket,
193 shell_socket=self.shell_socket,
193 iopub_socket=self.iopub_socket,
194 iopub_socket=self.iopub_socket,
194 stdin_socket=self.stdin_socket,
195 stdin_socket=self.stdin_socket,
195 log=self.log
196 log=self.log
196 )
197 )
197 self.kernel.record_ports(self.ports)
198 self.kernel.record_ports(self.ports)
198
199
199 def initialize(self, argv=None):
200 def initialize(self, argv=None):
200 super(KernelApp, self).initialize(argv)
201 super(KernelApp, self).initialize(argv)
201 self.init_session()
202 self.init_session()
202 self.init_poller()
203 self.init_poller()
203 self.init_sockets()
204 self.init_sockets()
204 self.init_io()
205 self.init_io()
205 self.init_kernel()
206 self.init_kernel()
206
207
207 def start(self):
208 def start(self):
208 self.heartbeat.start()
209 self.heartbeat.start()
209 if self.poller is not None:
210 if self.poller is not None:
210 self.poller.start()
211 self.poller.start()
211 try:
212 try:
212 self.kernel.start()
213 self.kernel.start()
213 except KeyboardInterrupt:
214 except KeyboardInterrupt:
214 pass
215 pass
215
216
@@ -1,627 +1,628 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 """Session object for building, serializing, sending, and receiving messages in
2 """Session object for building, serializing, sending, and receiving messages in
3 IPython. The Session object supports serialization, HMAC signatures, and
3 IPython. The Session object supports serialization, HMAC signatures, and
4 metadata on messages.
4 metadata on messages.
5
5
6 Also defined here are utilities for working with Sessions:
6 Also defined here are utilities for working with Sessions:
7 * A SessionFactory to be used as a base class for configurables that work with
7 * A SessionFactory to be used as a base class for configurables that work with
8 Sessions.
8 Sessions.
9 * A Message object for convenience that allows attribute-access to the msg dict.
9 * A Message object for convenience that allows attribute-access to the msg dict.
10
10
11 Authors:
11 Authors:
12
12
13 * Min RK
13 * Min RK
14 * Brian Granger
14 * Brian Granger
15 * Fernando Perez
15 * Fernando Perez
16 """
16 """
17 #-----------------------------------------------------------------------------
17 #-----------------------------------------------------------------------------
18 # Copyright (C) 2010-2011 The IPython Development Team
18 # Copyright (C) 2010-2011 The IPython Development Team
19 #
19 #
20 # Distributed under the terms of the BSD License. The full license is in
20 # Distributed under the terms of the BSD License. The full license is in
21 # the file COPYING, distributed as part of this software.
21 # the file COPYING, distributed as part of this software.
22 #-----------------------------------------------------------------------------
22 #-----------------------------------------------------------------------------
23
23
24 #-----------------------------------------------------------------------------
24 #-----------------------------------------------------------------------------
25 # Imports
25 # Imports
26 #-----------------------------------------------------------------------------
26 #-----------------------------------------------------------------------------
27
27
28 import hmac
28 import hmac
29 import logging
29 import logging
30 import os
30 import os
31 import pprint
31 import pprint
32 import uuid
32 import uuid
33 from datetime import datetime
33 from datetime import datetime
34
34
35 try:
35 try:
36 import cPickle
36 import cPickle
37 pickle = cPickle
37 pickle = cPickle
38 except:
38 except:
39 cPickle = None
39 cPickle = None
40 import pickle
40 import pickle
41
41
42 import zmq
42 import zmq
43 from zmq.utils import jsonapi
43 from zmq.utils import jsonapi
44 from zmq.eventloop.ioloop import IOLoop
44 from zmq.eventloop.ioloop import IOLoop
45 from zmq.eventloop.zmqstream import ZMQStream
45 from zmq.eventloop.zmqstream import ZMQStream
46
46
47 from IPython.config.configurable import Configurable, LoggingConfigurable
47 from IPython.config.configurable import Configurable, LoggingConfigurable
48 from IPython.utils.importstring import import_item
48 from IPython.utils.importstring import import_item
49 from IPython.utils.jsonutil import extract_dates, squash_dates, date_default
49 from IPython.utils.jsonutil import extract_dates, squash_dates, date_default
50 from IPython.utils.traitlets import Bytes, Unicode, Bool, Any, Instance, Set
50 from IPython.utils.traitlets import (Bytes, Unicode, Bool, Any, Instance, Set,
51 DottedObjectName)
51
52
52 #-----------------------------------------------------------------------------
53 #-----------------------------------------------------------------------------
53 # utility functions
54 # utility functions
54 #-----------------------------------------------------------------------------
55 #-----------------------------------------------------------------------------
55
56
56 def squash_unicode(obj):
57 def squash_unicode(obj):
57 """coerce unicode back to bytestrings."""
58 """coerce unicode back to bytestrings."""
58 if isinstance(obj,dict):
59 if isinstance(obj,dict):
59 for key in obj.keys():
60 for key in obj.keys():
60 obj[key] = squash_unicode(obj[key])
61 obj[key] = squash_unicode(obj[key])
61 if isinstance(key, unicode):
62 if isinstance(key, unicode):
62 obj[squash_unicode(key)] = obj.pop(key)
63 obj[squash_unicode(key)] = obj.pop(key)
63 elif isinstance(obj, list):
64 elif isinstance(obj, list):
64 for i,v in enumerate(obj):
65 for i,v in enumerate(obj):
65 obj[i] = squash_unicode(v)
66 obj[i] = squash_unicode(v)
66 elif isinstance(obj, unicode):
67 elif isinstance(obj, unicode):
67 obj = obj.encode('utf8')
68 obj = obj.encode('utf8')
68 return obj
69 return obj
69
70
70 #-----------------------------------------------------------------------------
71 #-----------------------------------------------------------------------------
71 # globals and defaults
72 # globals and defaults
72 #-----------------------------------------------------------------------------
73 #-----------------------------------------------------------------------------
73 key = 'on_unknown' if jsonapi.jsonmod.__name__ == 'jsonlib' else 'default'
74 key = 'on_unknown' if jsonapi.jsonmod.__name__ == 'jsonlib' else 'default'
74 json_packer = lambda obj: jsonapi.dumps(obj, **{key:date_default})
75 json_packer = lambda obj: jsonapi.dumps(obj, **{key:date_default})
75 json_unpacker = lambda s: extract_dates(jsonapi.loads(s))
76 json_unpacker = lambda s: extract_dates(jsonapi.loads(s))
76
77
77 pickle_packer = lambda o: pickle.dumps(o,-1)
78 pickle_packer = lambda o: pickle.dumps(o,-1)
78 pickle_unpacker = pickle.loads
79 pickle_unpacker = pickle.loads
79
80
80 default_packer = json_packer
81 default_packer = json_packer
81 default_unpacker = json_unpacker
82 default_unpacker = json_unpacker
82
83
83
84
84 DELIM="<IDS|MSG>"
85 DELIM="<IDS|MSG>"
85
86
86 #-----------------------------------------------------------------------------
87 #-----------------------------------------------------------------------------
87 # Classes
88 # Classes
88 #-----------------------------------------------------------------------------
89 #-----------------------------------------------------------------------------
89
90
90 class SessionFactory(LoggingConfigurable):
91 class SessionFactory(LoggingConfigurable):
91 """The Base class for configurables that have a Session, Context, logger,
92 """The Base class for configurables that have a Session, Context, logger,
92 and IOLoop.
93 and IOLoop.
93 """
94 """
94
95
95 logname = Unicode('')
96 logname = Unicode('')
96 def _logname_changed(self, name, old, new):
97 def _logname_changed(self, name, old, new):
97 self.log = logging.getLogger(new)
98 self.log = logging.getLogger(new)
98
99
99 # not configurable:
100 # not configurable:
100 context = Instance('zmq.Context')
101 context = Instance('zmq.Context')
101 def _context_default(self):
102 def _context_default(self):
102 return zmq.Context.instance()
103 return zmq.Context.instance()
103
104
104 session = Instance('IPython.zmq.session.Session')
105 session = Instance('IPython.zmq.session.Session')
105
106
106 loop = Instance('zmq.eventloop.ioloop.IOLoop', allow_none=False)
107 loop = Instance('zmq.eventloop.ioloop.IOLoop', allow_none=False)
107 def _loop_default(self):
108 def _loop_default(self):
108 return IOLoop.instance()
109 return IOLoop.instance()
109
110
110 def __init__(self, **kwargs):
111 def __init__(self, **kwargs):
111 super(SessionFactory, self).__init__(**kwargs)
112 super(SessionFactory, self).__init__(**kwargs)
112
113
113 if self.session is None:
114 if self.session is None:
114 # construct the session
115 # construct the session
115 self.session = Session(**kwargs)
116 self.session = Session(**kwargs)
116
117
117
118
118 class Message(object):
119 class Message(object):
119 """A simple message object that maps dict keys to attributes.
120 """A simple message object that maps dict keys to attributes.
120
121
121 A Message can be created from a dict and a dict from a Message instance
122 A Message can be created from a dict and a dict from a Message instance
122 simply by calling dict(msg_obj)."""
123 simply by calling dict(msg_obj)."""
123
124
124 def __init__(self, msg_dict):
125 def __init__(self, msg_dict):
125 dct = self.__dict__
126 dct = self.__dict__
126 for k, v in dict(msg_dict).iteritems():
127 for k, v in dict(msg_dict).iteritems():
127 if isinstance(v, dict):
128 if isinstance(v, dict):
128 v = Message(v)
129 v = Message(v)
129 dct[k] = v
130 dct[k] = v
130
131
131 # Having this iterator lets dict(msg_obj) work out of the box.
132 # Having this iterator lets dict(msg_obj) work out of the box.
132 def __iter__(self):
133 def __iter__(self):
133 return iter(self.__dict__.iteritems())
134 return iter(self.__dict__.iteritems())
134
135
135 def __repr__(self):
136 def __repr__(self):
136 return repr(self.__dict__)
137 return repr(self.__dict__)
137
138
138 def __str__(self):
139 def __str__(self):
139 return pprint.pformat(self.__dict__)
140 return pprint.pformat(self.__dict__)
140
141
141 def __contains__(self, k):
142 def __contains__(self, k):
142 return k in self.__dict__
143 return k in self.__dict__
143
144
144 def __getitem__(self, k):
145 def __getitem__(self, k):
145 return self.__dict__[k]
146 return self.__dict__[k]
146
147
147
148
148 def msg_header(msg_id, msg_type, username, session):
149 def msg_header(msg_id, msg_type, username, session):
149 date = datetime.now()
150 date = datetime.now()
150 return locals()
151 return locals()
151
152
152 def extract_header(msg_or_header):
153 def extract_header(msg_or_header):
153 """Given a message or header, return the header."""
154 """Given a message or header, return the header."""
154 if not msg_or_header:
155 if not msg_or_header:
155 return {}
156 return {}
156 try:
157 try:
157 # See if msg_or_header is the entire message.
158 # See if msg_or_header is the entire message.
158 h = msg_or_header['header']
159 h = msg_or_header['header']
159 except KeyError:
160 except KeyError:
160 try:
161 try:
161 # See if msg_or_header is just the header
162 # See if msg_or_header is just the header
162 h = msg_or_header['msg_id']
163 h = msg_or_header['msg_id']
163 except KeyError:
164 except KeyError:
164 raise
165 raise
165 else:
166 else:
166 h = msg_or_header
167 h = msg_or_header
167 if not isinstance(h, dict):
168 if not isinstance(h, dict):
168 h = dict(h)
169 h = dict(h)
169 return h
170 return h
170
171
171 class Session(Configurable):
172 class Session(Configurable):
172 """Object for handling serialization and sending of messages.
173 """Object for handling serialization and sending of messages.
173
174
174 The Session object handles building messages and sending them
175 The Session object handles building messages and sending them
175 with ZMQ sockets or ZMQStream objects. Objects can communicate with each
176 with ZMQ sockets or ZMQStream objects. Objects can communicate with each
176 other over the network via Session objects, and only need to work with the
177 other over the network via Session objects, and only need to work with the
177 dict-based IPython message spec. The Session will handle
178 dict-based IPython message spec. The Session will handle
178 serialization/deserialization, security, and metadata.
179 serialization/deserialization, security, and metadata.
179
180
180 Sessions support configurable serialiization via packer/unpacker traits,
181 Sessions support configurable serialiization via packer/unpacker traits,
181 and signing with HMAC digests via the key/keyfile traits.
182 and signing with HMAC digests via the key/keyfile traits.
182
183
183 Parameters
184 Parameters
184 ----------
185 ----------
185
186
186 debug : bool
187 debug : bool
187 whether to trigger extra debugging statements
188 whether to trigger extra debugging statements
188 packer/unpacker : str : 'json', 'pickle' or import_string
189 packer/unpacker : str : 'json', 'pickle' or import_string
189 importstrings for methods to serialize message parts. If just
190 importstrings for methods to serialize message parts. If just
190 'json' or 'pickle', predefined JSON and pickle packers will be used.
191 'json' or 'pickle', predefined JSON and pickle packers will be used.
191 Otherwise, the entire importstring must be used.
192 Otherwise, the entire importstring must be used.
192
193
193 The functions must accept at least valid JSON input, and output *bytes*.
194 The functions must accept at least valid JSON input, and output *bytes*.
194
195
195 For example, to use msgpack:
196 For example, to use msgpack:
196 packer = 'msgpack.packb', unpacker='msgpack.unpackb'
197 packer = 'msgpack.packb', unpacker='msgpack.unpackb'
197 pack/unpack : callables
198 pack/unpack : callables
198 You can also set the pack/unpack callables for serialization directly.
199 You can also set the pack/unpack callables for serialization directly.
199 session : bytes
200 session : bytes
200 the ID of this Session object. The default is to generate a new UUID.
201 the ID of this Session object. The default is to generate a new UUID.
201 username : unicode
202 username : unicode
202 username added to message headers. The default is to ask the OS.
203 username added to message headers. The default is to ask the OS.
203 key : bytes
204 key : bytes
204 The key used to initialize an HMAC signature. If unset, messages
205 The key used to initialize an HMAC signature. If unset, messages
205 will not be signed or checked.
206 will not be signed or checked.
206 keyfile : filepath
207 keyfile : filepath
207 The file containing a key. If this is set, `key` will be initialized
208 The file containing a key. If this is set, `key` will be initialized
208 to the contents of the file.
209 to the contents of the file.
209
210
210 """
211 """
211
212
212 debug=Bool(False, config=True, help="""Debug output in the Session""")
213 debug=Bool(False, config=True, help="""Debug output in the Session""")
213
214
214 packer = Unicode('json',config=True,
215 packer = DottedObjectName('json',config=True,
215 help="""The name of the packer for serializing messages.
216 help="""The name of the packer for serializing messages.
216 Should be one of 'json', 'pickle', or an import name
217 Should be one of 'json', 'pickle', or an import name
217 for a custom callable serializer.""")
218 for a custom callable serializer.""")
218 def _packer_changed(self, name, old, new):
219 def _packer_changed(self, name, old, new):
219 if new.lower() == 'json':
220 if new.lower() == 'json':
220 self.pack = json_packer
221 self.pack = json_packer
221 self.unpack = json_unpacker
222 self.unpack = json_unpacker
222 elif new.lower() == 'pickle':
223 elif new.lower() == 'pickle':
223 self.pack = pickle_packer
224 self.pack = pickle_packer
224 self.unpack = pickle_unpacker
225 self.unpack = pickle_unpacker
225 else:
226 else:
226 self.pack = import_item(str(new))
227 self.pack = import_item(str(new))
227
228
228 unpacker = Unicode('json', config=True,
229 unpacker = DottedObjectName('json', config=True,
229 help="""The name of the unpacker for unserializing messages.
230 help="""The name of the unpacker for unserializing messages.
230 Only used with custom functions for `packer`.""")
231 Only used with custom functions for `packer`.""")
231 def _unpacker_changed(self, name, old, new):
232 def _unpacker_changed(self, name, old, new):
232 if new.lower() == 'json':
233 if new.lower() == 'json':
233 self.pack = json_packer
234 self.pack = json_packer
234 self.unpack = json_unpacker
235 self.unpack = json_unpacker
235 elif new.lower() == 'pickle':
236 elif new.lower() == 'pickle':
236 self.pack = pickle_packer
237 self.pack = pickle_packer
237 self.unpack = pickle_unpacker
238 self.unpack = pickle_unpacker
238 else:
239 else:
239 self.unpack = import_item(str(new))
240 self.unpack = import_item(str(new))
240
241
241 session = Bytes(b'', config=True,
242 session = Bytes(b'', config=True,
242 help="""The UUID identifying this session.""")
243 help="""The UUID identifying this session.""")
243 def _session_default(self):
244 def _session_default(self):
244 return bytes(uuid.uuid4())
245 return bytes(uuid.uuid4())
245
246
246 username = Unicode(os.environ.get('USER','username'), config=True,
247 username = Unicode(os.environ.get('USER','username'), config=True,
247 help="""Username for the Session. Default is your system username.""")
248 help="""Username for the Session. Default is your system username.""")
248
249
249 # message signature related traits:
250 # message signature related traits:
250 key = Bytes(b'', config=True,
251 key = Bytes(b'', config=True,
251 help="""execution key, for extra authentication.""")
252 help="""execution key, for extra authentication.""")
252 def _key_changed(self, name, old, new):
253 def _key_changed(self, name, old, new):
253 if new:
254 if new:
254 self.auth = hmac.HMAC(new)
255 self.auth = hmac.HMAC(new)
255 else:
256 else:
256 self.auth = None
257 self.auth = None
257 auth = Instance(hmac.HMAC)
258 auth = Instance(hmac.HMAC)
258 digest_history = Set()
259 digest_history = Set()
259
260
260 keyfile = Unicode('', config=True,
261 keyfile = Unicode('', config=True,
261 help="""path to file containing execution key.""")
262 help="""path to file containing execution key.""")
262 def _keyfile_changed(self, name, old, new):
263 def _keyfile_changed(self, name, old, new):
263 with open(new, 'rb') as f:
264 with open(new, 'rb') as f:
264 self.key = f.read().strip()
265 self.key = f.read().strip()
265
266
266 pack = Any(default_packer) # the actual packer function
267 pack = Any(default_packer) # the actual packer function
267 def _pack_changed(self, name, old, new):
268 def _pack_changed(self, name, old, new):
268 if not callable(new):
269 if not callable(new):
269 raise TypeError("packer must be callable, not %s"%type(new))
270 raise TypeError("packer must be callable, not %s"%type(new))
270
271
271 unpack = Any(default_unpacker) # the actual packer function
272 unpack = Any(default_unpacker) # the actual packer function
272 def _unpack_changed(self, name, old, new):
273 def _unpack_changed(self, name, old, new):
273 # unpacker is not checked - it is assumed to be
274 # unpacker is not checked - it is assumed to be
274 if not callable(new):
275 if not callable(new):
275 raise TypeError("unpacker must be callable, not %s"%type(new))
276 raise TypeError("unpacker must be callable, not %s"%type(new))
276
277
277 def __init__(self, **kwargs):
278 def __init__(self, **kwargs):
278 """create a Session object
279 """create a Session object
279
280
280 Parameters
281 Parameters
281 ----------
282 ----------
282
283
283 debug : bool
284 debug : bool
284 whether to trigger extra debugging statements
285 whether to trigger extra debugging statements
285 packer/unpacker : str : 'json', 'pickle' or import_string
286 packer/unpacker : str : 'json', 'pickle' or import_string
286 importstrings for methods to serialize message parts. If just
287 importstrings for methods to serialize message parts. If just
287 'json' or 'pickle', predefined JSON and pickle packers will be used.
288 'json' or 'pickle', predefined JSON and pickle packers will be used.
288 Otherwise, the entire importstring must be used.
289 Otherwise, the entire importstring must be used.
289
290
290 The functions must accept at least valid JSON input, and output
291 The functions must accept at least valid JSON input, and output
291 *bytes*.
292 *bytes*.
292
293
293 For example, to use msgpack:
294 For example, to use msgpack:
294 packer = 'msgpack.packb', unpacker='msgpack.unpackb'
295 packer = 'msgpack.packb', unpacker='msgpack.unpackb'
295 pack/unpack : callables
296 pack/unpack : callables
296 You can also set the pack/unpack callables for serialization
297 You can also set the pack/unpack callables for serialization
297 directly.
298 directly.
298 session : bytes
299 session : bytes
299 the ID of this Session object. The default is to generate a new
300 the ID of this Session object. The default is to generate a new
300 UUID.
301 UUID.
301 username : unicode
302 username : unicode
302 username added to message headers. The default is to ask the OS.
303 username added to message headers. The default is to ask the OS.
303 key : bytes
304 key : bytes
304 The key used to initialize an HMAC signature. If unset, messages
305 The key used to initialize an HMAC signature. If unset, messages
305 will not be signed or checked.
306 will not be signed or checked.
306 keyfile : filepath
307 keyfile : filepath
307 The file containing a key. If this is set, `key` will be
308 The file containing a key. If this is set, `key` will be
308 initialized to the contents of the file.
309 initialized to the contents of the file.
309 """
310 """
310 super(Session, self).__init__(**kwargs)
311 super(Session, self).__init__(**kwargs)
311 self._check_packers()
312 self._check_packers()
312 self.none = self.pack({})
313 self.none = self.pack({})
313
314
314 @property
315 @property
315 def msg_id(self):
316 def msg_id(self):
316 """always return new uuid"""
317 """always return new uuid"""
317 return str(uuid.uuid4())
318 return str(uuid.uuid4())
318
319
319 def _check_packers(self):
320 def _check_packers(self):
320 """check packers for binary data and datetime support."""
321 """check packers for binary data and datetime support."""
321 pack = self.pack
322 pack = self.pack
322 unpack = self.unpack
323 unpack = self.unpack
323
324
324 # check simple serialization
325 # check simple serialization
325 msg = dict(a=[1,'hi'])
326 msg = dict(a=[1,'hi'])
326 try:
327 try:
327 packed = pack(msg)
328 packed = pack(msg)
328 except Exception:
329 except Exception:
329 raise ValueError("packer could not serialize a simple message")
330 raise ValueError("packer could not serialize a simple message")
330
331
331 # ensure packed message is bytes
332 # ensure packed message is bytes
332 if not isinstance(packed, bytes):
333 if not isinstance(packed, bytes):
333 raise ValueError("message packed to %r, but bytes are required"%type(packed))
334 raise ValueError("message packed to %r, but bytes are required"%type(packed))
334
335
335 # check that unpack is pack's inverse
336 # check that unpack is pack's inverse
336 try:
337 try:
337 unpacked = unpack(packed)
338 unpacked = unpack(packed)
338 except Exception:
339 except Exception:
339 raise ValueError("unpacker could not handle the packer's output")
340 raise ValueError("unpacker could not handle the packer's output")
340
341
341 # check datetime support
342 # check datetime support
342 msg = dict(t=datetime.now())
343 msg = dict(t=datetime.now())
343 try:
344 try:
344 unpacked = unpack(pack(msg))
345 unpacked = unpack(pack(msg))
345 except Exception:
346 except Exception:
346 self.pack = lambda o: pack(squash_dates(o))
347 self.pack = lambda o: pack(squash_dates(o))
347 self.unpack = lambda s: extract_dates(unpack(s))
348 self.unpack = lambda s: extract_dates(unpack(s))
348
349
349 def msg_header(self, msg_type):
350 def msg_header(self, msg_type):
350 return msg_header(self.msg_id, msg_type, self.username, self.session)
351 return msg_header(self.msg_id, msg_type, self.username, self.session)
351
352
352 def msg(self, msg_type, content=None, parent=None, subheader=None):
353 def msg(self, msg_type, content=None, parent=None, subheader=None):
353 msg = {}
354 msg = {}
354 msg['header'] = self.msg_header(msg_type)
355 msg['header'] = self.msg_header(msg_type)
355 msg['msg_id'] = msg['header']['msg_id']
356 msg['msg_id'] = msg['header']['msg_id']
356 msg['parent_header'] = {} if parent is None else extract_header(parent)
357 msg['parent_header'] = {} if parent is None else extract_header(parent)
357 msg['msg_type'] = msg_type
358 msg['msg_type'] = msg_type
358 msg['content'] = {} if content is None else content
359 msg['content'] = {} if content is None else content
359 sub = {} if subheader is None else subheader
360 sub = {} if subheader is None else subheader
360 msg['header'].update(sub)
361 msg['header'].update(sub)
361 return msg
362 return msg
362
363
363 def sign(self, msg):
364 def sign(self, msg):
364 """Sign a message with HMAC digest. If no auth, return b''."""
365 """Sign a message with HMAC digest. If no auth, return b''."""
365 if self.auth is None:
366 if self.auth is None:
366 return b''
367 return b''
367 h = self.auth.copy()
368 h = self.auth.copy()
368 for m in msg:
369 for m in msg:
369 h.update(m)
370 h.update(m)
370 return h.hexdigest()
371 return h.hexdigest()
371
372
372 def serialize(self, msg, ident=None):
373 def serialize(self, msg, ident=None):
373 """Serialize the message components to bytes.
374 """Serialize the message components to bytes.
374
375
375 Returns
376 Returns
376 -------
377 -------
377
378
378 list of bytes objects
379 list of bytes objects
379
380
380 """
381 """
381 content = msg.get('content', {})
382 content = msg.get('content', {})
382 if content is None:
383 if content is None:
383 content = self.none
384 content = self.none
384 elif isinstance(content, dict):
385 elif isinstance(content, dict):
385 content = self.pack(content)
386 content = self.pack(content)
386 elif isinstance(content, bytes):
387 elif isinstance(content, bytes):
387 # content is already packed, as in a relayed message
388 # content is already packed, as in a relayed message
388 pass
389 pass
389 elif isinstance(content, unicode):
390 elif isinstance(content, unicode):
390 # should be bytes, but JSON often spits out unicode
391 # should be bytes, but JSON often spits out unicode
391 content = content.encode('utf8')
392 content = content.encode('utf8')
392 else:
393 else:
393 raise TypeError("Content incorrect type: %s"%type(content))
394 raise TypeError("Content incorrect type: %s"%type(content))
394
395
395 real_message = [self.pack(msg['header']),
396 real_message = [self.pack(msg['header']),
396 self.pack(msg['parent_header']),
397 self.pack(msg['parent_header']),
397 content
398 content
398 ]
399 ]
399
400
400 to_send = []
401 to_send = []
401
402
402 if isinstance(ident, list):
403 if isinstance(ident, list):
403 # accept list of idents
404 # accept list of idents
404 to_send.extend(ident)
405 to_send.extend(ident)
405 elif ident is not None:
406 elif ident is not None:
406 to_send.append(ident)
407 to_send.append(ident)
407 to_send.append(DELIM)
408 to_send.append(DELIM)
408
409
409 signature = self.sign(real_message)
410 signature = self.sign(real_message)
410 to_send.append(signature)
411 to_send.append(signature)
411
412
412 to_send.extend(real_message)
413 to_send.extend(real_message)
413
414
414 return to_send
415 return to_send
415
416
416 def send(self, stream, msg_or_type, content=None, parent=None, ident=None,
417 def send(self, stream, msg_or_type, content=None, parent=None, ident=None,
417 buffers=None, subheader=None, track=False):
418 buffers=None, subheader=None, track=False):
418 """Build and send a message via stream or socket.
419 """Build and send a message via stream or socket.
419
420
420 Parameters
421 Parameters
421 ----------
422 ----------
422
423
423 stream : zmq.Socket or ZMQStream
424 stream : zmq.Socket or ZMQStream
424 the socket-like object used to send the data
425 the socket-like object used to send the data
425 msg_or_type : str or Message/dict
426 msg_or_type : str or Message/dict
426 Normally, msg_or_type will be a msg_type unless a message is being
427 Normally, msg_or_type will be a msg_type unless a message is being
427 sent more than once.
428 sent more than once.
428
429
429 content : dict or None
430 content : dict or None
430 the content of the message (ignored if msg_or_type is a message)
431 the content of the message (ignored if msg_or_type is a message)
431 parent : Message or dict or None
432 parent : Message or dict or None
432 the parent or parent header describing the parent of this message
433 the parent or parent header describing the parent of this message
433 ident : bytes or list of bytes
434 ident : bytes or list of bytes
434 the zmq.IDENTITY routing path
435 the zmq.IDENTITY routing path
435 subheader : dict or None
436 subheader : dict or None
436 extra header keys for this message's header
437 extra header keys for this message's header
437 buffers : list or None
438 buffers : list or None
438 the already-serialized buffers to be appended to the message
439 the already-serialized buffers to be appended to the message
439 track : bool
440 track : bool
440 whether to track. Only for use with Sockets,
441 whether to track. Only for use with Sockets,
441 because ZMQStream objects cannot track messages.
442 because ZMQStream objects cannot track messages.
442
443
443 Returns
444 Returns
444 -------
445 -------
445 msg : message dict
446 msg : message dict
446 the constructed message
447 the constructed message
447 (msg,tracker) : (message dict, MessageTracker)
448 (msg,tracker) : (message dict, MessageTracker)
448 if track=True, then a 2-tuple will be returned,
449 if track=True, then a 2-tuple will be returned,
449 the first element being the constructed
450 the first element being the constructed
450 message, and the second being the MessageTracker
451 message, and the second being the MessageTracker
451
452
452 """
453 """
453
454
454 if not isinstance(stream, (zmq.Socket, ZMQStream)):
455 if not isinstance(stream, (zmq.Socket, ZMQStream)):
455 raise TypeError("stream must be Socket or ZMQStream, not %r"%type(stream))
456 raise TypeError("stream must be Socket or ZMQStream, not %r"%type(stream))
456 elif track and isinstance(stream, ZMQStream):
457 elif track and isinstance(stream, ZMQStream):
457 raise TypeError("ZMQStream cannot track messages")
458 raise TypeError("ZMQStream cannot track messages")
458
459
459 if isinstance(msg_or_type, (Message, dict)):
460 if isinstance(msg_or_type, (Message, dict)):
460 # we got a Message, not a msg_type
461 # we got a Message, not a msg_type
461 # don't build a new Message
462 # don't build a new Message
462 msg = msg_or_type
463 msg = msg_or_type
463 else:
464 else:
464 msg = self.msg(msg_or_type, content, parent, subheader)
465 msg = self.msg(msg_or_type, content, parent, subheader)
465
466
466 buffers = [] if buffers is None else buffers
467 buffers = [] if buffers is None else buffers
467 to_send = self.serialize(msg, ident)
468 to_send = self.serialize(msg, ident)
468 flag = 0
469 flag = 0
469 if buffers:
470 if buffers:
470 flag = zmq.SNDMORE
471 flag = zmq.SNDMORE
471 _track = False
472 _track = False
472 else:
473 else:
473 _track=track
474 _track=track
474 if track:
475 if track:
475 tracker = stream.send_multipart(to_send, flag, copy=False, track=_track)
476 tracker = stream.send_multipart(to_send, flag, copy=False, track=_track)
476 else:
477 else:
477 tracker = stream.send_multipart(to_send, flag, copy=False)
478 tracker = stream.send_multipart(to_send, flag, copy=False)
478 for b in buffers[:-1]:
479 for b in buffers[:-1]:
479 stream.send(b, flag, copy=False)
480 stream.send(b, flag, copy=False)
480 if buffers:
481 if buffers:
481 if track:
482 if track:
482 tracker = stream.send(buffers[-1], copy=False, track=track)
483 tracker = stream.send(buffers[-1], copy=False, track=track)
483 else:
484 else:
484 tracker = stream.send(buffers[-1], copy=False)
485 tracker = stream.send(buffers[-1], copy=False)
485
486
486 # omsg = Message(msg)
487 # omsg = Message(msg)
487 if self.debug:
488 if self.debug:
488 pprint.pprint(msg)
489 pprint.pprint(msg)
489 pprint.pprint(to_send)
490 pprint.pprint(to_send)
490 pprint.pprint(buffers)
491 pprint.pprint(buffers)
491
492
492 msg['tracker'] = tracker
493 msg['tracker'] = tracker
493
494
494 return msg
495 return msg
495
496
496 def send_raw(self, stream, msg, flags=0, copy=True, ident=None):
497 def send_raw(self, stream, msg, flags=0, copy=True, ident=None):
497 """Send a raw message via ident path.
498 """Send a raw message via ident path.
498
499
499 Parameters
500 Parameters
500 ----------
501 ----------
501 msg : list of sendable buffers"""
502 msg : list of sendable buffers"""
502 to_send = []
503 to_send = []
503 if isinstance(ident, bytes):
504 if isinstance(ident, bytes):
504 ident = [ident]
505 ident = [ident]
505 if ident is not None:
506 if ident is not None:
506 to_send.extend(ident)
507 to_send.extend(ident)
507
508
508 to_send.append(DELIM)
509 to_send.append(DELIM)
509 to_send.append(self.sign(msg))
510 to_send.append(self.sign(msg))
510 to_send.extend(msg)
511 to_send.extend(msg)
511 stream.send_multipart(msg, flags, copy=copy)
512 stream.send_multipart(msg, flags, copy=copy)
512
513
513 def recv(self, socket, mode=zmq.NOBLOCK, content=True, copy=True):
514 def recv(self, socket, mode=zmq.NOBLOCK, content=True, copy=True):
514 """receives and unpacks a message
515 """receives and unpacks a message
515 returns [idents], msg"""
516 returns [idents], msg"""
516 if isinstance(socket, ZMQStream):
517 if isinstance(socket, ZMQStream):
517 socket = socket.socket
518 socket = socket.socket
518 try:
519 try:
519 msg = socket.recv_multipart(mode)
520 msg = socket.recv_multipart(mode)
520 except zmq.ZMQError as e:
521 except zmq.ZMQError as e:
521 if e.errno == zmq.EAGAIN:
522 if e.errno == zmq.EAGAIN:
522 # We can convert EAGAIN to None as we know in this case
523 # We can convert EAGAIN to None as we know in this case
523 # recv_multipart won't return None.
524 # recv_multipart won't return None.
524 return None,None
525 return None,None
525 else:
526 else:
526 raise
527 raise
527 # split multipart message into identity list and message dict
528 # split multipart message into identity list and message dict
528 # invalid large messages can cause very expensive string comparisons
529 # invalid large messages can cause very expensive string comparisons
529 idents, msg = self.feed_identities(msg, copy)
530 idents, msg = self.feed_identities(msg, copy)
530 try:
531 try:
531 return idents, self.unpack_message(msg, content=content, copy=copy)
532 return idents, self.unpack_message(msg, content=content, copy=copy)
532 except Exception as e:
533 except Exception as e:
533 print (idents, msg)
534 print (idents, msg)
534 # TODO: handle it
535 # TODO: handle it
535 raise e
536 raise e
536
537
537 def feed_identities(self, msg, copy=True):
538 def feed_identities(self, msg, copy=True):
538 """feed until DELIM is reached, then return the prefix as idents and
539 """feed until DELIM is reached, then return the prefix as idents and
539 remainder as msg. This is easily broken by setting an IDENT to DELIM,
540 remainder as msg. This is easily broken by setting an IDENT to DELIM,
540 but that would be silly.
541 but that would be silly.
541
542
542 Parameters
543 Parameters
543 ----------
544 ----------
544 msg : a list of Message or bytes objects
545 msg : a list of Message or bytes objects
545 the message to be split
546 the message to be split
546 copy : bool
547 copy : bool
547 flag determining whether the arguments are bytes or Messages
548 flag determining whether the arguments are bytes or Messages
548
549
549 Returns
550 Returns
550 -------
551 -------
551 (idents,msg) : two lists
552 (idents,msg) : two lists
552 idents will always be a list of bytes - the indentity prefix
553 idents will always be a list of bytes - the indentity prefix
553 msg will be a list of bytes or Messages, unchanged from input
554 msg will be a list of bytes or Messages, unchanged from input
554 msg should be unpackable via self.unpack_message at this point.
555 msg should be unpackable via self.unpack_message at this point.
555 """
556 """
556 if copy:
557 if copy:
557 idx = msg.index(DELIM)
558 idx = msg.index(DELIM)
558 return msg[:idx], msg[idx+1:]
559 return msg[:idx], msg[idx+1:]
559 else:
560 else:
560 failed = True
561 failed = True
561 for idx,m in enumerate(msg):
562 for idx,m in enumerate(msg):
562 if m.bytes == DELIM:
563 if m.bytes == DELIM:
563 failed = False
564 failed = False
564 break
565 break
565 if failed:
566 if failed:
566 raise ValueError("DELIM not in msg")
567 raise ValueError("DELIM not in msg")
567 idents, msg = msg[:idx], msg[idx+1:]
568 idents, msg = msg[:idx], msg[idx+1:]
568 return [m.bytes for m in idents], msg
569 return [m.bytes for m in idents], msg
569
570
570 def unpack_message(self, msg, content=True, copy=True):
571 def unpack_message(self, msg, content=True, copy=True):
571 """Return a message object from the format
572 """Return a message object from the format
572 sent by self.send.
573 sent by self.send.
573
574
574 Parameters:
575 Parameters:
575 -----------
576 -----------
576
577
577 content : bool (True)
578 content : bool (True)
578 whether to unpack the content dict (True),
579 whether to unpack the content dict (True),
579 or leave it serialized (False)
580 or leave it serialized (False)
580
581
581 copy : bool (True)
582 copy : bool (True)
582 whether to return the bytes (True),
583 whether to return the bytes (True),
583 or the non-copying Message object in each place (False)
584 or the non-copying Message object in each place (False)
584
585
585 """
586 """
586 minlen = 4
587 minlen = 4
587 message = {}
588 message = {}
588 if not copy:
589 if not copy:
589 for i in range(minlen):
590 for i in range(minlen):
590 msg[i] = msg[i].bytes
591 msg[i] = msg[i].bytes
591 if self.auth is not None:
592 if self.auth is not None:
592 signature = msg[0]
593 signature = msg[0]
593 if signature in self.digest_history:
594 if signature in self.digest_history:
594 raise ValueError("Duplicate Signature: %r"%signature)
595 raise ValueError("Duplicate Signature: %r"%signature)
595 self.digest_history.add(signature)
596 self.digest_history.add(signature)
596 check = self.sign(msg[1:4])
597 check = self.sign(msg[1:4])
597 if not signature == check:
598 if not signature == check:
598 raise ValueError("Invalid Signature: %r"%signature)
599 raise ValueError("Invalid Signature: %r"%signature)
599 if not len(msg) >= minlen:
600 if not len(msg) >= minlen:
600 raise TypeError("malformed message, must have at least %i elements"%minlen)
601 raise TypeError("malformed message, must have at least %i elements"%minlen)
601 message['header'] = self.unpack(msg[1])
602 message['header'] = self.unpack(msg[1])
602 message['msg_type'] = message['header']['msg_type']
603 message['msg_type'] = message['header']['msg_type']
603 message['parent_header'] = self.unpack(msg[2])
604 message['parent_header'] = self.unpack(msg[2])
604 if content:
605 if content:
605 message['content'] = self.unpack(msg[3])
606 message['content'] = self.unpack(msg[3])
606 else:
607 else:
607 message['content'] = msg[3]
608 message['content'] = msg[3]
608
609
609 message['buffers'] = msg[4:]
610 message['buffers'] = msg[4:]
610 return message
611 return message
611
612
612 def test_msg2obj():
613 def test_msg2obj():
613 am = dict(x=1)
614 am = dict(x=1)
614 ao = Message(am)
615 ao = Message(am)
615 assert ao.x == am['x']
616 assert ao.x == am['x']
616
617
617 am['y'] = dict(z=1)
618 am['y'] = dict(z=1)
618 ao = Message(am)
619 ao = Message(am)
619 assert ao.y.z == am['y']['z']
620 assert ao.y.z == am['y']['z']
620
621
621 k1, k2 = 'y', 'z'
622 k1, k2 = 'y', 'z'
622 assert ao[k1][k2] == am[k1][k2]
623 assert ao[k1][k2] == am[k1][k2]
623
624
624 am2 = dict(ao)
625 am2 = dict(ao)
625 assert am['x'] == am2['x']
626 assert am['x'] == am2['x']
626 assert am['y']['z'] == am2['y']['z']
627 assert am['y']['z'] == am2['y']['z']
627
628
General Comments 0
You need to be logged in to leave comments. Login now