##// END OF EJS Templates
Merge pull request #1652 from minrk/zmqcompat...
Fernando Perez -
r6632:64a50619 merge
parent child Browse files
Show More
@@ -1,15 +1,20 b''
1 1 """The IPython HTML Notebook"""
2 2
3 3 # check for tornado 2.1.0
4 4 msg = "The IPython Notebook requires tornado >= 2.1.0"
5 5 try:
6 6 import tornado
7 7 except ImportError:
8 8 raise ImportError(msg)
9 9 try:
10 10 version_info = tornado.version_info
11 11 except AttributeError:
12 12 raise ImportError(msg + ", but you have < 1.1.0")
13 13 if version_info < (2,1,0):
14 14 raise ImportError(msg + ", but you have %s" % tornado.version)
15 15 del msg
16
17 # check for pyzmq 2.1.4
18 from IPython.zmq import check_for_zmq
19 check_for_zmq('2.1.4', 'IPython.frontend.html.notebook')
20 del check_for_zmq
@@ -1,569 +1,563 b''
1 1 # coding: utf-8
2 2 """A tornado based IPython notebook server.
3 3
4 4 Authors:
5 5
6 6 * Brian Granger
7 7 """
8 8 #-----------------------------------------------------------------------------
9 9 # Copyright (C) 2008-2011 The IPython Development Team
10 10 #
11 11 # Distributed under the terms of the BSD License. The full license is in
12 12 # the file COPYING, distributed as part of this software.
13 13 #-----------------------------------------------------------------------------
14 14
15 15 #-----------------------------------------------------------------------------
16 16 # Imports
17 17 #-----------------------------------------------------------------------------
18 18
19 19 # stdlib
20 20 import errno
21 21 import logging
22 22 import os
23 23 import re
24 24 import select
25 25 import signal
26 26 import socket
27 27 import sys
28 28 import threading
29 29 import time
30 30 import webbrowser
31 31
32 32 # Third party
33 33 import zmq
34 34
35 35 # Install the pyzmq ioloop. This has to be done before anything else from
36 36 # tornado is imported.
37 37 from zmq.eventloop import ioloop
38 # FIXME: ioloop.install is new in pyzmq-2.1.7, so remove this conditional
39 # when pyzmq dependency is updated beyond that.
40 if hasattr(ioloop, 'install'):
41 ioloop.install()
42 else:
43 import tornado.ioloop
44 tornado.ioloop.IOLoop = ioloop.IOLoop
38 ioloop.install()
45 39
46 40 from tornado import httpserver
47 41 from tornado import web
48 42
49 43 # Our own libraries
50 44 from .kernelmanager import MappingKernelManager
51 45 from .handlers import (LoginHandler, LogoutHandler,
52 46 ProjectDashboardHandler, NewHandler, NamedNotebookHandler,
53 47 MainKernelHandler, KernelHandler, KernelActionHandler, IOPubHandler,
54 48 ShellHandler, NotebookRootHandler, NotebookHandler, NotebookCopyHandler,
55 49 RSTHandler, AuthenticatedFileHandler, PrintNotebookHandler,
56 50 MainClusterHandler, ClusterProfileHandler, ClusterActionHandler
57 51 )
58 52 from .notebookmanager import NotebookManager
59 53 from .clustermanager import ClusterManager
60 54
61 55 from IPython.config.application import catch_config_error, boolean_flag
62 56 from IPython.core.application import BaseIPythonApplication
63 57 from IPython.core.profiledir import ProfileDir
64 58 from IPython.lib.kernel import swallow_argv
65 59 from IPython.zmq.session import Session, default_secure
66 60 from IPython.zmq.zmqshell import ZMQInteractiveShell
67 61 from IPython.zmq.ipkernel import (
68 62 flags as ipkernel_flags,
69 63 aliases as ipkernel_aliases,
70 64 IPKernelApp
71 65 )
72 66 from IPython.utils.traitlets import Dict, Unicode, Integer, List, Enum, Bool
73 67 from IPython.utils import py3compat
74 68
75 69 #-----------------------------------------------------------------------------
76 70 # Module globals
77 71 #-----------------------------------------------------------------------------
78 72
79 73 _kernel_id_regex = r"(?P<kernel_id>\w+-\w+-\w+-\w+-\w+)"
80 74 _kernel_action_regex = r"(?P<action>restart|interrupt)"
81 75 _notebook_id_regex = r"(?P<notebook_id>\w+-\w+-\w+-\w+-\w+)"
82 76 _profile_regex = r"(?P<profile>[a-zA-Z0-9]+)"
83 77 _cluster_action_regex = r"(?P<action>start|stop)"
84 78
85 79
86 80 LOCALHOST = '127.0.0.1'
87 81
88 82 _examples = """
89 83 ipython notebook # start the notebook
90 84 ipython notebook --profile=sympy # use the sympy profile
91 85 ipython notebook --pylab=inline # pylab in inline plotting mode
92 86 ipython notebook --certfile=mycert.pem # use SSL/TLS certificate
93 87 ipython notebook --port=5555 --ip=* # Listen on port 5555, all interfaces
94 88 """
95 89
96 90 #-----------------------------------------------------------------------------
97 91 # Helper functions
98 92 #-----------------------------------------------------------------------------
99 93
100 94 def url_path_join(a,b):
101 95 if a.endswith('/') and b.startswith('/'):
102 96 return a[:-1]+b
103 97 else:
104 98 return a+b
105 99
106 100 #-----------------------------------------------------------------------------
107 101 # The Tornado web application
108 102 #-----------------------------------------------------------------------------
109 103
110 104 class NotebookWebApplication(web.Application):
111 105
112 106 def __init__(self, ipython_app, kernel_manager, notebook_manager,
113 107 cluster_manager, log,
114 108 base_project_url, settings_overrides):
115 109 handlers = [
116 110 (r"/", ProjectDashboardHandler),
117 111 (r"/login", LoginHandler),
118 112 (r"/logout", LogoutHandler),
119 113 (r"/new", NewHandler),
120 114 (r"/%s" % _notebook_id_regex, NamedNotebookHandler),
121 115 (r"/%s/copy" % _notebook_id_regex, NotebookCopyHandler),
122 116 (r"/%s/print" % _notebook_id_regex, PrintNotebookHandler),
123 117 (r"/kernels", MainKernelHandler),
124 118 (r"/kernels/%s" % _kernel_id_regex, KernelHandler),
125 119 (r"/kernels/%s/%s" % (_kernel_id_regex, _kernel_action_regex), KernelActionHandler),
126 120 (r"/kernels/%s/iopub" % _kernel_id_regex, IOPubHandler),
127 121 (r"/kernels/%s/shell" % _kernel_id_regex, ShellHandler),
128 122 (r"/notebooks", NotebookRootHandler),
129 123 (r"/notebooks/%s" % _notebook_id_regex, NotebookHandler),
130 124 (r"/rstservice/render", RSTHandler),
131 125 (r"/files/(.*)", AuthenticatedFileHandler, {'path' : notebook_manager.notebook_dir}),
132 126 (r"/clusters", MainClusterHandler),
133 127 (r"/clusters/%s/%s" % (_profile_regex, _cluster_action_regex), ClusterActionHandler),
134 128 (r"/clusters/%s" % _profile_regex, ClusterProfileHandler),
135 129 ]
136 130 settings = dict(
137 131 template_path=os.path.join(os.path.dirname(__file__), "templates"),
138 132 static_path=os.path.join(os.path.dirname(__file__), "static"),
139 133 cookie_secret=os.urandom(1024),
140 134 login_url="/login",
141 135 )
142 136
143 137 # allow custom overrides for the tornado web app.
144 138 settings.update(settings_overrides)
145 139
146 140 # Python < 2.6.5 doesn't accept unicode keys in f(**kwargs), and
147 141 # base_project_url will always be unicode, which will in turn
148 142 # make the patterns unicode, and ultimately result in unicode
149 143 # keys in kwargs to handler._execute(**kwargs) in tornado.
150 144 # This enforces that base_project_url be ascii in that situation.
151 145 #
152 146 # Note that the URLs these patterns check against are escaped,
153 147 # and thus guaranteed to be ASCII: 'hΓ©llo' is really 'h%C3%A9llo'.
154 148 base_project_url = py3compat.unicode_to_str(base_project_url, 'ascii')
155 149
156 150 # prepend base_project_url onto the patterns that we match
157 151 new_handlers = []
158 152 for handler in handlers:
159 153 pattern = url_path_join(base_project_url, handler[0])
160 154 new_handler = tuple([pattern]+list(handler[1:]))
161 155 new_handlers.append( new_handler )
162 156
163 157 super(NotebookWebApplication, self).__init__(new_handlers, **settings)
164 158
165 159 self.kernel_manager = kernel_manager
166 160 self.notebook_manager = notebook_manager
167 161 self.cluster_manager = cluster_manager
168 162 self.ipython_app = ipython_app
169 163 self.read_only = self.ipython_app.read_only
170 164 self.log = log
171 165
172 166
173 167 #-----------------------------------------------------------------------------
174 168 # Aliases and Flags
175 169 #-----------------------------------------------------------------------------
176 170
177 171 flags = dict(ipkernel_flags)
178 172 flags['no-browser']=(
179 173 {'NotebookApp' : {'open_browser' : False}},
180 174 "Don't open the notebook in a browser after startup."
181 175 )
182 176 flags['no-mathjax']=(
183 177 {'NotebookApp' : {'enable_mathjax' : False}},
184 178 """Disable MathJax
185 179
186 180 MathJax is the javascript library IPython uses to render math/LaTeX. It is
187 181 very large, so you may want to disable it if you have a slow internet
188 182 connection, or for offline use of the notebook.
189 183
190 184 When disabled, equations etc. will appear as their untransformed TeX source.
191 185 """
192 186 )
193 187 flags['read-only'] = (
194 188 {'NotebookApp' : {'read_only' : True}},
195 189 """Allow read-only access to notebooks.
196 190
197 191 When using a password to protect the notebook server, this flag
198 192 allows unauthenticated clients to view the notebook list, and
199 193 individual notebooks, but not edit them, start kernels, or run
200 194 code.
201 195
202 196 If no password is set, the server will be entirely read-only.
203 197 """
204 198 )
205 199
206 200 # Add notebook manager flags
207 201 flags.update(boolean_flag('script', 'NotebookManager.save_script',
208 202 'Auto-save a .py script everytime the .ipynb notebook is saved',
209 203 'Do not auto-save .py scripts for every notebook'))
210 204
211 205 # the flags that are specific to the frontend
212 206 # these must be scrubbed before being passed to the kernel,
213 207 # or it will raise an error on unrecognized flags
214 208 notebook_flags = ['no-browser', 'no-mathjax', 'read-only', 'script', 'no-script']
215 209
216 210 aliases = dict(ipkernel_aliases)
217 211
218 212 aliases.update({
219 213 'ip': 'NotebookApp.ip',
220 214 'port': 'NotebookApp.port',
221 215 'keyfile': 'NotebookApp.keyfile',
222 216 'certfile': 'NotebookApp.certfile',
223 217 'notebook-dir': 'NotebookManager.notebook_dir',
224 218 'browser': 'NotebookApp.browser',
225 219 })
226 220
227 221 # remove ipkernel flags that are singletons, and don't make sense in
228 222 # multi-kernel evironment:
229 223 aliases.pop('f', None)
230 224
231 225 notebook_aliases = [u'port', u'ip', u'keyfile', u'certfile',
232 226 u'notebook-dir']
233 227
234 228 #-----------------------------------------------------------------------------
235 229 # NotebookApp
236 230 #-----------------------------------------------------------------------------
237 231
238 232 class NotebookApp(BaseIPythonApplication):
239 233
240 234 name = 'ipython-notebook'
241 235 default_config_file_name='ipython_notebook_config.py'
242 236
243 237 description = """
244 238 The IPython HTML Notebook.
245 239
246 240 This launches a Tornado based HTML Notebook Server that serves up an
247 241 HTML5/Javascript Notebook client.
248 242 """
249 243 examples = _examples
250 244
251 245 classes = [IPKernelApp, ZMQInteractiveShell, ProfileDir, Session,
252 246 MappingKernelManager, NotebookManager]
253 247 flags = Dict(flags)
254 248 aliases = Dict(aliases)
255 249
256 250 kernel_argv = List(Unicode)
257 251
258 252 log_level = Enum((0,10,20,30,40,50,'DEBUG','INFO','WARN','ERROR','CRITICAL'),
259 253 default_value=logging.INFO,
260 254 config=True,
261 255 help="Set the log level by value or name.")
262 256
263 257 # create requested profiles by default, if they don't exist:
264 258 auto_create = Bool(True)
265 259
266 260 # Network related information.
267 261
268 262 ip = Unicode(LOCALHOST, config=True,
269 263 help="The IP address the notebook server will listen on."
270 264 )
271 265
272 266 def _ip_changed(self, name, old, new):
273 267 if new == u'*': self.ip = u''
274 268
275 269 port = Integer(8888, config=True,
276 270 help="The port the notebook server will listen on."
277 271 )
278 272
279 273 certfile = Unicode(u'', config=True,
280 274 help="""The full path to an SSL/TLS certificate file."""
281 275 )
282 276
283 277 keyfile = Unicode(u'', config=True,
284 278 help="""The full path to a private key file for usage with SSL/TLS."""
285 279 )
286 280
287 281 password = Unicode(u'', config=True,
288 282 help="""Hashed password to use for web authentication.
289 283
290 284 To generate, type in a python/IPython shell:
291 285
292 286 from IPython.lib import passwd; passwd()
293 287
294 288 The string should be of the form type:salt:hashed-password.
295 289 """
296 290 )
297 291
298 292 open_browser = Bool(True, config=True,
299 293 help="""Whether to open in a browser after starting.
300 294 The specific browser used is platform dependent and
301 295 determined by the python standard library `webbrowser`
302 296 module, unless it is overridden using the --browser
303 297 (NotebookApp.browser) configuration option.
304 298 """)
305 299
306 300 browser = Unicode(u'', config=True,
307 301 help="""Specify what command to use to invoke a web
308 302 browser when opening the notebook. If not specified, the
309 303 default browser will be determined by the `webbrowser`
310 304 standard library module, which allows setting of the
311 305 BROWSER environment variable to override it.
312 306 """)
313 307
314 308 read_only = Bool(False, config=True,
315 309 help="Whether to prevent editing/execution of notebooks."
316 310 )
317 311
318 312 webapp_settings = Dict(config=True,
319 313 help="Supply overrides for the tornado.web.Application that the "
320 314 "IPython notebook uses.")
321 315
322 316 enable_mathjax = Bool(True, config=True,
323 317 help="""Whether to enable MathJax for typesetting math/TeX
324 318
325 319 MathJax is the javascript library IPython uses to render math/LaTeX. It is
326 320 very large, so you may want to disable it if you have a slow internet
327 321 connection, or for offline use of the notebook.
328 322
329 323 When disabled, equations etc. will appear as their untransformed TeX source.
330 324 """
331 325 )
332 326 def _enable_mathjax_changed(self, name, old, new):
333 327 """set mathjax url to empty if mathjax is disabled"""
334 328 if not new:
335 329 self.mathjax_url = u''
336 330
337 331 base_project_url = Unicode('/', config=True,
338 332 help='''The base URL for the notebook server''')
339 333 base_kernel_url = Unicode('/', config=True,
340 334 help='''The base URL for the kernel server''')
341 335 websocket_host = Unicode("", config=True,
342 336 help="""The hostname for the websocket server."""
343 337 )
344 338
345 339 mathjax_url = Unicode("", config=True,
346 340 help="""The url for MathJax.js."""
347 341 )
348 342 def _mathjax_url_default(self):
349 343 if not self.enable_mathjax:
350 344 return u''
351 345 static_path = self.webapp_settings.get("static_path", os.path.join(os.path.dirname(__file__), "static"))
352 346 static_url_prefix = self.webapp_settings.get("static_url_prefix",
353 347 "/static/")
354 348 if os.path.exists(os.path.join(static_path, 'mathjax', "MathJax.js")):
355 349 self.log.info("Using local MathJax")
356 350 return static_url_prefix+u"mathjax/MathJax.js"
357 351 else:
358 352 self.log.info("Using MathJax from CDN")
359 353 hostname = "cdn.mathjax.org"
360 354 try:
361 355 # resolve mathjax cdn alias to cloudfront, because Amazon's SSL certificate
362 356 # only works on *.cloudfront.net
363 357 true_host, aliases, IPs = socket.gethostbyname_ex(hostname)
364 358 # I've run this on a few machines, and some put the right answer in true_host,
365 359 # while others gave it in the aliases list, so we check both.
366 360 aliases.insert(0, true_host)
367 361 except Exception:
368 362 self.log.warn("Couldn't determine MathJax CDN info")
369 363 else:
370 364 for alias in aliases:
371 365 parts = alias.split('.')
372 366 # want static foo.cloudfront.net, not dynamic foo.lax3.cloudfront.net
373 367 if len(parts) == 3 and alias.endswith(".cloudfront.net"):
374 368 hostname = alias
375 369 break
376 370
377 371 if not hostname.endswith(".cloudfront.net"):
378 372 self.log.error("Couldn't resolve CloudFront host, required for HTTPS MathJax.")
379 373 self.log.error("Loading from https://cdn.mathjax.org will probably fail due to invalid certificate.")
380 374 self.log.error("For unsecured HTTP access to MathJax use config:")
381 375 self.log.error("NotebookApp.mathjax_url='http://cdn.mathjax.org/mathjax/latest/MathJax.js'")
382 376 return u"https://%s/mathjax/latest/MathJax.js" % hostname
383 377
384 378 def _mathjax_url_changed(self, name, old, new):
385 379 if new and not self.enable_mathjax:
386 380 # enable_mathjax=False overrides mathjax_url
387 381 self.mathjax_url = u''
388 382 else:
389 383 self.log.info("Using MathJax: %s", new)
390 384
391 385 def parse_command_line(self, argv=None):
392 386 super(NotebookApp, self).parse_command_line(argv)
393 387 if argv is None:
394 388 argv = sys.argv[1:]
395 389
396 390 # Scrub frontend-specific flags
397 391 self.kernel_argv = swallow_argv(argv, notebook_aliases, notebook_flags)
398 392 # Kernel should inherit default config file from frontend
399 393 self.kernel_argv.append("--KernelApp.parent_appname='%s'"%self.name)
400 394
401 395 def init_configurables(self):
402 396 # force Session default to be secure
403 397 default_secure(self.config)
404 398 # Create a KernelManager and start a kernel.
405 399 self.kernel_manager = MappingKernelManager(
406 400 config=self.config, log=self.log, kernel_argv=self.kernel_argv,
407 401 connection_dir = self.profile_dir.security_dir,
408 402 )
409 403 self.notebook_manager = NotebookManager(config=self.config, log=self.log)
410 404 self.notebook_manager.list_notebooks()
411 405 self.cluster_manager = ClusterManager(config=self.config, log=self.log)
412 406 self.cluster_manager.update_profiles()
413 407
414 408 def init_logging(self):
415 409 super(NotebookApp, self).init_logging()
416 410 # This prevents double log messages because tornado use a root logger that
417 411 # self.log is a child of. The logging module dipatches log messages to a log
418 412 # and all of its ancenstors until propagate is set to False.
419 413 self.log.propagate = False
420 414
421 415 def init_webapp(self):
422 416 """initialize tornado webapp and httpserver"""
423 417 self.web_app = NotebookWebApplication(
424 418 self, self.kernel_manager, self.notebook_manager,
425 419 self.cluster_manager, self.log,
426 420 self.base_project_url, self.webapp_settings
427 421 )
428 422 if self.certfile:
429 423 ssl_options = dict(certfile=self.certfile)
430 424 if self.keyfile:
431 425 ssl_options['keyfile'] = self.keyfile
432 426 else:
433 427 ssl_options = None
434 428 self.web_app.password = self.password
435 429 self.http_server = httpserver.HTTPServer(self.web_app, ssl_options=ssl_options)
436 430 if ssl_options is None and not self.ip and not (self.read_only and not self.password):
437 431 self.log.critical('WARNING: the notebook server is listening on all IP addresses '
438 432 'but not using any encryption or authentication. This is highly '
439 433 'insecure and not recommended.')
440 434
441 435 # Try random ports centered around the default.
442 436 from random import randint
443 437 n = 50 # Max number of attempts, keep reasonably large.
444 438 for port in range(self.port, self.port+5) + [self.port + randint(-2*n, 2*n) for i in range(n-5)]:
445 439 try:
446 440 self.http_server.listen(port, self.ip)
447 441 except socket.error, e:
448 442 if e.errno != errno.EADDRINUSE:
449 443 raise
450 444 self.log.info('The port %i is already in use, trying another random port.' % port)
451 445 else:
452 446 self.port = port
453 447 break
454 448
455 449 def init_signal(self):
456 450 # FIXME: remove this check when pyzmq dependency is >= 2.1.11
457 451 # safely extract zmq version info:
458 452 try:
459 453 zmq_v = zmq.pyzmq_version_info()
460 454 except AttributeError:
461 455 zmq_v = [ int(n) for n in re.findall(r'\d+', zmq.__version__) ]
462 456 if 'dev' in zmq.__version__:
463 457 zmq_v.append(999)
464 458 zmq_v = tuple(zmq_v)
465 459 if zmq_v >= (2,1,9):
466 460 # This won't work with 2.1.7 and
467 461 # 2.1.9-10 will log ugly 'Interrupted system call' messages,
468 462 # but it will work
469 463 signal.signal(signal.SIGINT, self._handle_sigint)
470 464 signal.signal(signal.SIGTERM, self._signal_stop)
471 465
472 466 def _handle_sigint(self, sig, frame):
473 467 """SIGINT handler spawns confirmation dialog"""
474 468 # register more forceful signal handler for ^C^C case
475 469 signal.signal(signal.SIGINT, self._signal_stop)
476 470 # request confirmation dialog in bg thread, to avoid
477 471 # blocking the App
478 472 thread = threading.Thread(target=self._confirm_exit)
479 473 thread.daemon = True
480 474 thread.start()
481 475
482 476 def _restore_sigint_handler(self):
483 477 """callback for restoring original SIGINT handler"""
484 478 signal.signal(signal.SIGINT, self._handle_sigint)
485 479
486 480 def _confirm_exit(self):
487 481 """confirm shutdown on ^C
488 482
489 483 A second ^C, or answering 'y' within 5s will cause shutdown,
490 484 otherwise original SIGINT handler will be restored.
491 485 """
492 486 # FIXME: remove this delay when pyzmq dependency is >= 2.1.11
493 487 time.sleep(0.1)
494 488 sys.stdout.write("Shutdown Notebook Server (y/[n])? ")
495 489 sys.stdout.flush()
496 490 r,w,x = select.select([sys.stdin], [], [], 5)
497 491 if r:
498 492 line = sys.stdin.readline()
499 493 if line.lower().startswith('y'):
500 494 self.log.critical("Shutdown confirmed")
501 495 ioloop.IOLoop.instance().stop()
502 496 return
503 497 else:
504 498 print "No answer for 5s:",
505 499 print "resuming operation..."
506 500 # no answer, or answer is no:
507 501 # set it back to original SIGINT handler
508 502 # use IOLoop.add_callback because signal.signal must be called
509 503 # from main thread
510 504 ioloop.IOLoop.instance().add_callback(self._restore_sigint_handler)
511 505
512 506 def _signal_stop(self, sig, frame):
513 507 self.log.critical("received signal %s, stopping", sig)
514 508 ioloop.IOLoop.instance().stop()
515 509
516 510 @catch_config_error
517 511 def initialize(self, argv=None):
518 512 super(NotebookApp, self).initialize(argv)
519 513 self.init_configurables()
520 514 self.init_webapp()
521 515 self.init_signal()
522 516
523 517 def cleanup_kernels(self):
524 518 """shutdown all kernels
525 519
526 520 The kernels will shutdown themselves when this process no longer exists,
527 521 but explicit shutdown allows the KernelManagers to cleanup the connection files.
528 522 """
529 523 self.log.info('Shutting down kernels')
530 524 km = self.kernel_manager
531 525 # copy list, since kill_kernel deletes keys
532 526 for kid in list(km.kernel_ids):
533 527 km.kill_kernel(kid)
534 528
535 529 def start(self):
536 530 ip = self.ip if self.ip else '[all ip addresses on your system]'
537 531 proto = 'https' if self.certfile else 'http'
538 532 info = self.log.info
539 533 info("The IPython Notebook is running at: %s://%s:%i%s" %
540 534 (proto, ip, self.port,self.base_project_url) )
541 535 info("Use Control-C to stop this server and shut down all kernels.")
542 536
543 537 if self.open_browser:
544 538 ip = self.ip or '127.0.0.1'
545 539 if self.browser:
546 540 browser = webbrowser.get(self.browser)
547 541 else:
548 542 browser = webbrowser.get()
549 543 b = lambda : browser.open("%s://%s:%i%s" % (proto, ip, self.port,
550 544 self.base_project_url),
551 545 new=2)
552 546 threading.Thread(target=b).start()
553 547 try:
554 548 ioloop.IOLoop.instance().start()
555 549 except KeyboardInterrupt:
556 550 info("Interrupted...")
557 551 finally:
558 552 self.cleanup_kernels()
559 553
560 554
561 555 #-----------------------------------------------------------------------------
562 556 # Main entry point
563 557 #-----------------------------------------------------------------------------
564 558
565 559 def launch_new_instance():
566 560 app = NotebookApp.instance()
567 561 app.initialize()
568 562 app.start()
569 563
@@ -1,39 +1,68 b''
1 1 #-----------------------------------------------------------------------------
2 2 # Copyright (C) 2010-2011 The IPython Development Team
3 3 #
4 4 # Distributed under the terms of the BSD License. The full license is in
5 5 # the file COPYING.txt, distributed as part of this software.
6 6 #-----------------------------------------------------------------------------
7 7
8 8 #-----------------------------------------------------------------------------
9 9 # Verify zmq version dependency >= 2.1.4
10 10 #-----------------------------------------------------------------------------
11 11
12 12 import warnings
13 13 from distutils.version import LooseVersion as V
14 14
15
16 def patch_pyzmq():
17 """backport a few patches from newer pyzmq
18
19 These can be removed as we bump our minimum pyzmq version
20 """
21
22 import zmq
23
24 # ioloop.install, introduced in pyzmq 2.1.7
25 from zmq.eventloop import ioloop
26
27 def install():
28 import tornado.ioloop
29 tornado.ioloop.IOLoop = ioloop.IOLoop
30
31 if not hasattr(ioloop, 'install'):
32 ioloop.install = install
33
34 # fix missing DEALER/ROUTER aliases in pyzmq < 2.1.9
35 if not hasattr(zmq, 'DEALER'):
36 zmq.DEALER = zmq.XREQ
37 if not hasattr(zmq, 'ROUTER'):
38 zmq.ROUTER = zmq.XREP
39
40 # fallback on stdlib json if jsonlib is selected, because jsonlib breaks things.
41 # jsonlib support is removed from pyzmq >= 2.2.0
42
43 from zmq.utils import jsonapi
44 if jsonapi.jsonmod.__name__ == 'jsonlib':
45 import json
46 jsonapi.jsonmod = json
47
48
15 49 def check_for_zmq(minimum_version, module='IPython.zmq'):
16 50 try:
17 51 import zmq
18 52 except ImportError:
19 53 raise ImportError("%s requires pyzmq >= %s"%(module, minimum_version))
20 54
21 55 pyzmq_version = zmq.__version__
22 56
23 57 if 'dev' not in pyzmq_version and V(pyzmq_version) < V(minimum_version):
24 58 raise ImportError("%s requires pyzmq >= %s, but you have %s"%(
25 59 module, minimum_version, pyzmq_version))
26 60
27 # fix missing DEALER/ROUTER aliases in pyzmq < 2.1.9
28 if not hasattr(zmq, 'DEALER'):
29 zmq.DEALER = zmq.XREQ
30 if not hasattr(zmq, 'ROUTER'):
31 zmq.ROUTER = zmq.XREP
32
33 61 if V(zmq.zmq_version()) >= V('4.0.0'):
34 62 warnings.warn("""libzmq 4 detected.
35 63 It is unlikely that IPython's zmq code will work properly.
36 64 Please install libzmq stable, which is 2.1.x or 2.2.x""",
37 65 RuntimeWarning)
38 66
39 67 check_for_zmq('2.1.4')
68 patch_pyzmq()
@@ -1,768 +1,756 b''
1 1 """Session object for building, serializing, sending, and receiving messages in
2 2 IPython. The Session object supports serialization, HMAC signatures, and
3 3 metadata on messages.
4 4
5 5 Also defined here are utilities for working with Sessions:
6 6 * A SessionFactory to be used as a base class for configurables that work with
7 7 Sessions.
8 8 * A Message object for convenience that allows attribute-access to the msg dict.
9 9
10 10 Authors:
11 11
12 12 * Min RK
13 13 * Brian Granger
14 14 * Fernando Perez
15 15 """
16 16 #-----------------------------------------------------------------------------
17 17 # Copyright (C) 2010-2011 The IPython Development Team
18 18 #
19 19 # Distributed under the terms of the BSD License. The full license is in
20 20 # the file COPYING, distributed as part of this software.
21 21 #-----------------------------------------------------------------------------
22 22
23 23 #-----------------------------------------------------------------------------
24 24 # Imports
25 25 #-----------------------------------------------------------------------------
26 26
27 27 import hmac
28 28 import logging
29 29 import os
30 30 import pprint
31 31 import uuid
32 32 from datetime import datetime
33 33
34 34 try:
35 35 import cPickle
36 36 pickle = cPickle
37 37 except:
38 38 cPickle = None
39 39 import pickle
40 40
41 41 import zmq
42 42 from zmq.utils import jsonapi
43 43 from zmq.eventloop.ioloop import IOLoop
44 44 from zmq.eventloop.zmqstream import ZMQStream
45 45
46 46 from IPython.config.application import Application, boolean_flag
47 47 from IPython.config.configurable import Configurable, LoggingConfigurable
48 48 from IPython.utils.importstring import import_item
49 49 from IPython.utils.jsonutil import extract_dates, squash_dates, date_default
50 50 from IPython.utils.py3compat import str_to_bytes
51 51 from IPython.utils.traitlets import (CBytes, Unicode, Bool, Any, Instance, Set,
52 52 DottedObjectName, CUnicode)
53 53
54 54 #-----------------------------------------------------------------------------
55 55 # utility functions
56 56 #-----------------------------------------------------------------------------
57 57
58 58 def squash_unicode(obj):
59 59 """coerce unicode back to bytestrings."""
60 60 if isinstance(obj,dict):
61 61 for key in obj.keys():
62 62 obj[key] = squash_unicode(obj[key])
63 63 if isinstance(key, unicode):
64 64 obj[squash_unicode(key)] = obj.pop(key)
65 65 elif isinstance(obj, list):
66 66 for i,v in enumerate(obj):
67 67 obj[i] = squash_unicode(v)
68 68 elif isinstance(obj, unicode):
69 69 obj = obj.encode('utf8')
70 70 return obj
71 71
72 72 #-----------------------------------------------------------------------------
73 73 # globals and defaults
74 74 #-----------------------------------------------------------------------------
75 75
76 76
77 # jsonlib behaves a bit differently, so handle that where it affects us
78 if jsonapi.jsonmod.__name__ == 'jsonlib':
79 # kwarg for serializing unknown types (datetime) is different
80 dumps_kwargs = dict(on_unknown=date_default)
81 # By default, jsonlib unpacks floats as Decimal instead of float,
82 # which can foul things up
83 loads_kwargs = dict(use_float=True)
84 else:
85 # ISO8601-ify datetime objects
86 dumps_kwargs = dict(default=date_default)
87 # nothing to specify for loads
88 loads_kwargs = dict()
89
90 json_packer = lambda obj: jsonapi.dumps(obj, **dumps_kwargs)
91 json_unpacker = lambda s: extract_dates(jsonapi.loads(s, **loads_kwargs))
77 # ISO8601-ify datetime objects
78 json_packer = lambda obj: jsonapi.dumps(obj, default=date_default)
79 json_unpacker = lambda s: extract_dates(jsonapi.loads(s))
92 80
93 81 pickle_packer = lambda o: pickle.dumps(o,-1)
94 82 pickle_unpacker = pickle.loads
95 83
96 84 default_packer = json_packer
97 85 default_unpacker = json_unpacker
98 86
99 87 DELIM=b"<IDS|MSG>"
100 88
101 89
102 90 #-----------------------------------------------------------------------------
103 91 # Mixin tools for apps that use Sessions
104 92 #-----------------------------------------------------------------------------
105 93
106 94 session_aliases = dict(
107 95 ident = 'Session.session',
108 96 user = 'Session.username',
109 97 keyfile = 'Session.keyfile',
110 98 )
111 99
112 100 session_flags = {
113 101 'secure' : ({'Session' : { 'key' : str_to_bytes(str(uuid.uuid4())),
114 102 'keyfile' : '' }},
115 103 """Use HMAC digests for authentication of messages.
116 104 Setting this flag will generate a new UUID to use as the HMAC key.
117 105 """),
118 106 'no-secure' : ({'Session' : { 'key' : b'', 'keyfile' : '' }},
119 107 """Don't authenticate messages."""),
120 108 }
121 109
122 110 def default_secure(cfg):
123 111 """Set the default behavior for a config environment to be secure.
124 112
125 113 If Session.key/keyfile have not been set, set Session.key to
126 114 a new random UUID.
127 115 """
128 116
129 117 if 'Session' in cfg:
130 118 if 'key' in cfg.Session or 'keyfile' in cfg.Session:
131 119 return
132 120 # key/keyfile not specified, generate new UUID:
133 121 cfg.Session.key = str_to_bytes(str(uuid.uuid4()))
134 122
135 123
136 124 #-----------------------------------------------------------------------------
137 125 # Classes
138 126 #-----------------------------------------------------------------------------
139 127
140 128 class SessionFactory(LoggingConfigurable):
141 129 """The Base class for configurables that have a Session, Context, logger,
142 130 and IOLoop.
143 131 """
144 132
145 133 logname = Unicode('')
146 134 def _logname_changed(self, name, old, new):
147 135 self.log = logging.getLogger(new)
148 136
149 137 # not configurable:
150 138 context = Instance('zmq.Context')
151 139 def _context_default(self):
152 140 return zmq.Context.instance()
153 141
154 142 session = Instance('IPython.zmq.session.Session')
155 143
156 144 loop = Instance('zmq.eventloop.ioloop.IOLoop', allow_none=False)
157 145 def _loop_default(self):
158 146 return IOLoop.instance()
159 147
160 148 def __init__(self, **kwargs):
161 149 super(SessionFactory, self).__init__(**kwargs)
162 150
163 151 if self.session is None:
164 152 # construct the session
165 153 self.session = Session(**kwargs)
166 154
167 155
168 156 class Message(object):
169 157 """A simple message object that maps dict keys to attributes.
170 158
171 159 A Message can be created from a dict and a dict from a Message instance
172 160 simply by calling dict(msg_obj)."""
173 161
174 162 def __init__(self, msg_dict):
175 163 dct = self.__dict__
176 164 for k, v in dict(msg_dict).iteritems():
177 165 if isinstance(v, dict):
178 166 v = Message(v)
179 167 dct[k] = v
180 168
181 169 # Having this iterator lets dict(msg_obj) work out of the box.
182 170 def __iter__(self):
183 171 return iter(self.__dict__.iteritems())
184 172
185 173 def __repr__(self):
186 174 return repr(self.__dict__)
187 175
188 176 def __str__(self):
189 177 return pprint.pformat(self.__dict__)
190 178
191 179 def __contains__(self, k):
192 180 return k in self.__dict__
193 181
194 182 def __getitem__(self, k):
195 183 return self.__dict__[k]
196 184
197 185
198 186 def msg_header(msg_id, msg_type, username, session):
199 187 date = datetime.now()
200 188 return locals()
201 189
202 190 def extract_header(msg_or_header):
203 191 """Given a message or header, return the header."""
204 192 if not msg_or_header:
205 193 return {}
206 194 try:
207 195 # See if msg_or_header is the entire message.
208 196 h = msg_or_header['header']
209 197 except KeyError:
210 198 try:
211 199 # See if msg_or_header is just the header
212 200 h = msg_or_header['msg_id']
213 201 except KeyError:
214 202 raise
215 203 else:
216 204 h = msg_or_header
217 205 if not isinstance(h, dict):
218 206 h = dict(h)
219 207 return h
220 208
221 209 class Session(Configurable):
222 210 """Object for handling serialization and sending of messages.
223 211
224 212 The Session object handles building messages and sending them
225 213 with ZMQ sockets or ZMQStream objects. Objects can communicate with each
226 214 other over the network via Session objects, and only need to work with the
227 215 dict-based IPython message spec. The Session will handle
228 216 serialization/deserialization, security, and metadata.
229 217
230 218 Sessions support configurable serialiization via packer/unpacker traits,
231 219 and signing with HMAC digests via the key/keyfile traits.
232 220
233 221 Parameters
234 222 ----------
235 223
236 224 debug : bool
237 225 whether to trigger extra debugging statements
238 226 packer/unpacker : str : 'json', 'pickle' or import_string
239 227 importstrings for methods to serialize message parts. If just
240 228 'json' or 'pickle', predefined JSON and pickle packers will be used.
241 229 Otherwise, the entire importstring must be used.
242 230
243 231 The functions must accept at least valid JSON input, and output *bytes*.
244 232
245 233 For example, to use msgpack:
246 234 packer = 'msgpack.packb', unpacker='msgpack.unpackb'
247 235 pack/unpack : callables
248 236 You can also set the pack/unpack callables for serialization directly.
249 237 session : bytes
250 238 the ID of this Session object. The default is to generate a new UUID.
251 239 username : unicode
252 240 username added to message headers. The default is to ask the OS.
253 241 key : bytes
254 242 The key used to initialize an HMAC signature. If unset, messages
255 243 will not be signed or checked.
256 244 keyfile : filepath
257 245 The file containing a key. If this is set, `key` will be initialized
258 246 to the contents of the file.
259 247
260 248 """
261 249
262 250 debug=Bool(False, config=True, help="""Debug output in the Session""")
263 251
264 252 packer = DottedObjectName('json',config=True,
265 253 help="""The name of the packer for serializing messages.
266 254 Should be one of 'json', 'pickle', or an import name
267 255 for a custom callable serializer.""")
268 256 def _packer_changed(self, name, old, new):
269 257 if new.lower() == 'json':
270 258 self.pack = json_packer
271 259 self.unpack = json_unpacker
272 260 elif new.lower() == 'pickle':
273 261 self.pack = pickle_packer
274 262 self.unpack = pickle_unpacker
275 263 else:
276 264 self.pack = import_item(str(new))
277 265
278 266 unpacker = DottedObjectName('json', config=True,
279 267 help="""The name of the unpacker for unserializing messages.
280 268 Only used with custom functions for `packer`.""")
281 269 def _unpacker_changed(self, name, old, new):
282 270 if new.lower() == 'json':
283 271 self.pack = json_packer
284 272 self.unpack = json_unpacker
285 273 elif new.lower() == 'pickle':
286 274 self.pack = pickle_packer
287 275 self.unpack = pickle_unpacker
288 276 else:
289 277 self.unpack = import_item(str(new))
290 278
291 279 session = CUnicode(u'', config=True,
292 280 help="""The UUID identifying this session.""")
293 281 def _session_default(self):
294 282 u = unicode(uuid.uuid4())
295 283 self.bsession = u.encode('ascii')
296 284 return u
297 285
298 286 def _session_changed(self, name, old, new):
299 287 self.bsession = self.session.encode('ascii')
300 288
301 289 # bsession is the session as bytes
302 290 bsession = CBytes(b'')
303 291
304 292 username = Unicode(os.environ.get('USER',u'username'), config=True,
305 293 help="""Username for the Session. Default is your system username.""")
306 294
307 295 # message signature related traits:
308 296
309 297 key = CBytes(b'', config=True,
310 298 help="""execution key, for extra authentication.""")
311 299 def _key_changed(self, name, old, new):
312 300 if new:
313 301 self.auth = hmac.HMAC(new)
314 302 else:
315 303 self.auth = None
316 304 auth = Instance(hmac.HMAC)
317 305 digest_history = Set()
318 306
319 307 keyfile = Unicode('', config=True,
320 308 help="""path to file containing execution key.""")
321 309 def _keyfile_changed(self, name, old, new):
322 310 with open(new, 'rb') as f:
323 311 self.key = f.read().strip()
324 312
325 313 # serialization traits:
326 314
327 315 pack = Any(default_packer) # the actual packer function
328 316 def _pack_changed(self, name, old, new):
329 317 if not callable(new):
330 318 raise TypeError("packer must be callable, not %s"%type(new))
331 319
332 320 unpack = Any(default_unpacker) # the actual packer function
333 321 def _unpack_changed(self, name, old, new):
334 322 # unpacker is not checked - it is assumed to be
335 323 if not callable(new):
336 324 raise TypeError("unpacker must be callable, not %s"%type(new))
337 325
338 326 def __init__(self, **kwargs):
339 327 """create a Session object
340 328
341 329 Parameters
342 330 ----------
343 331
344 332 debug : bool
345 333 whether to trigger extra debugging statements
346 334 packer/unpacker : str : 'json', 'pickle' or import_string
347 335 importstrings for methods to serialize message parts. If just
348 336 'json' or 'pickle', predefined JSON and pickle packers will be used.
349 337 Otherwise, the entire importstring must be used.
350 338
351 339 The functions must accept at least valid JSON input, and output
352 340 *bytes*.
353 341
354 342 For example, to use msgpack:
355 343 packer = 'msgpack.packb', unpacker='msgpack.unpackb'
356 344 pack/unpack : callables
357 345 You can also set the pack/unpack callables for serialization
358 346 directly.
359 347 session : unicode (must be ascii)
360 348 the ID of this Session object. The default is to generate a new
361 349 UUID.
362 350 bsession : bytes
363 351 The session as bytes
364 352 username : unicode
365 353 username added to message headers. The default is to ask the OS.
366 354 key : bytes
367 355 The key used to initialize an HMAC signature. If unset, messages
368 356 will not be signed or checked.
369 357 keyfile : filepath
370 358 The file containing a key. If this is set, `key` will be
371 359 initialized to the contents of the file.
372 360 """
373 361 super(Session, self).__init__(**kwargs)
374 362 self._check_packers()
375 363 self.none = self.pack({})
376 364 # ensure self._session_default() if necessary, so bsession is defined:
377 365 self.session
378 366
379 367 @property
380 368 def msg_id(self):
381 369 """always return new uuid"""
382 370 return str(uuid.uuid4())
383 371
384 372 def _check_packers(self):
385 373 """check packers for binary data and datetime support."""
386 374 pack = self.pack
387 375 unpack = self.unpack
388 376
389 377 # check simple serialization
390 378 msg = dict(a=[1,'hi'])
391 379 try:
392 380 packed = pack(msg)
393 381 except Exception:
394 382 raise ValueError("packer could not serialize a simple message")
395 383
396 384 # ensure packed message is bytes
397 385 if not isinstance(packed, bytes):
398 386 raise ValueError("message packed to %r, but bytes are required"%type(packed))
399 387
400 388 # check that unpack is pack's inverse
401 389 try:
402 390 unpacked = unpack(packed)
403 391 except Exception:
404 392 raise ValueError("unpacker could not handle the packer's output")
405 393
406 394 # check datetime support
407 395 msg = dict(t=datetime.now())
408 396 try:
409 397 unpacked = unpack(pack(msg))
410 398 except Exception:
411 399 self.pack = lambda o: pack(squash_dates(o))
412 400 self.unpack = lambda s: extract_dates(unpack(s))
413 401
414 402 def msg_header(self, msg_type):
415 403 return msg_header(self.msg_id, msg_type, self.username, self.session)
416 404
417 405 def msg(self, msg_type, content=None, parent=None, subheader=None, header=None):
418 406 """Return the nested message dict.
419 407
420 408 This format is different from what is sent over the wire. The
421 409 serialize/unserialize methods converts this nested message dict to the wire
422 410 format, which is a list of message parts.
423 411 """
424 412 msg = {}
425 413 header = self.msg_header(msg_type) if header is None else header
426 414 msg['header'] = header
427 415 msg['msg_id'] = header['msg_id']
428 416 msg['msg_type'] = header['msg_type']
429 417 msg['parent_header'] = {} if parent is None else extract_header(parent)
430 418 msg['content'] = {} if content is None else content
431 419 sub = {} if subheader is None else subheader
432 420 msg['header'].update(sub)
433 421 return msg
434 422
435 423 def sign(self, msg_list):
436 424 """Sign a message with HMAC digest. If no auth, return b''.
437 425
438 426 Parameters
439 427 ----------
440 428 msg_list : list
441 429 The [p_header,p_parent,p_content] part of the message list.
442 430 """
443 431 if self.auth is None:
444 432 return b''
445 433 h = self.auth.copy()
446 434 for m in msg_list:
447 435 h.update(m)
448 436 return str_to_bytes(h.hexdigest())
449 437
450 438 def serialize(self, msg, ident=None):
451 439 """Serialize the message components to bytes.
452 440
453 441 This is roughly the inverse of unserialize. The serialize/unserialize
454 442 methods work with full message lists, whereas pack/unpack work with
455 443 the individual message parts in the message list.
456 444
457 445 Parameters
458 446 ----------
459 447 msg : dict or Message
460 448 The nexted message dict as returned by the self.msg method.
461 449
462 450 Returns
463 451 -------
464 452 msg_list : list
465 453 The list of bytes objects to be sent with the format:
466 454 [ident1,ident2,...,DELIM,HMAC,p_header,p_parent,p_content,
467 455 buffer1,buffer2,...]. In this list, the p_* entities are
468 456 the packed or serialized versions, so if JSON is used, these
469 457 are utf8 encoded JSON strings.
470 458 """
471 459 content = msg.get('content', {})
472 460 if content is None:
473 461 content = self.none
474 462 elif isinstance(content, dict):
475 463 content = self.pack(content)
476 464 elif isinstance(content, bytes):
477 465 # content is already packed, as in a relayed message
478 466 pass
479 467 elif isinstance(content, unicode):
480 468 # should be bytes, but JSON often spits out unicode
481 469 content = content.encode('utf8')
482 470 else:
483 471 raise TypeError("Content incorrect type: %s"%type(content))
484 472
485 473 real_message = [self.pack(msg['header']),
486 474 self.pack(msg['parent_header']),
487 475 content
488 476 ]
489 477
490 478 to_send = []
491 479
492 480 if isinstance(ident, list):
493 481 # accept list of idents
494 482 to_send.extend(ident)
495 483 elif ident is not None:
496 484 to_send.append(ident)
497 485 to_send.append(DELIM)
498 486
499 487 signature = self.sign(real_message)
500 488 to_send.append(signature)
501 489
502 490 to_send.extend(real_message)
503 491
504 492 return to_send
505 493
506 494 def send(self, stream, msg_or_type, content=None, parent=None, ident=None,
507 495 buffers=None, subheader=None, track=False, header=None):
508 496 """Build and send a message via stream or socket.
509 497
510 498 The message format used by this function internally is as follows:
511 499
512 500 [ident1,ident2,...,DELIM,HMAC,p_header,p_parent,p_content,
513 501 buffer1,buffer2,...]
514 502
515 503 The serialize/unserialize methods convert the nested message dict into this
516 504 format.
517 505
518 506 Parameters
519 507 ----------
520 508
521 509 stream : zmq.Socket or ZMQStream
522 510 The socket-like object used to send the data.
523 511 msg_or_type : str or Message/dict
524 512 Normally, msg_or_type will be a msg_type unless a message is being
525 513 sent more than once. If a header is supplied, this can be set to
526 514 None and the msg_type will be pulled from the header.
527 515
528 516 content : dict or None
529 517 The content of the message (ignored if msg_or_type is a message).
530 518 header : dict or None
531 519 The header dict for the message (ignores if msg_to_type is a message).
532 520 parent : Message or dict or None
533 521 The parent or parent header describing the parent of this message
534 522 (ignored if msg_or_type is a message).
535 523 ident : bytes or list of bytes
536 524 The zmq.IDENTITY routing path.
537 525 subheader : dict or None
538 526 Extra header keys for this message's header (ignored if msg_or_type
539 527 is a message).
540 528 buffers : list or None
541 529 The already-serialized buffers to be appended to the message.
542 530 track : bool
543 531 Whether to track. Only for use with Sockets, because ZMQStream
544 532 objects cannot track messages.
545 533
546 534 Returns
547 535 -------
548 536 msg : dict
549 537 The constructed message.
550 538 (msg,tracker) : (dict, MessageTracker)
551 539 if track=True, then a 2-tuple will be returned,
552 540 the first element being the constructed
553 541 message, and the second being the MessageTracker
554 542
555 543 """
556 544
557 545 if not isinstance(stream, (zmq.Socket, ZMQStream)):
558 546 raise TypeError("stream must be Socket or ZMQStream, not %r"%type(stream))
559 547 elif track and isinstance(stream, ZMQStream):
560 548 raise TypeError("ZMQStream cannot track messages")
561 549
562 550 if isinstance(msg_or_type, (Message, dict)):
563 551 # We got a Message or message dict, not a msg_type so don't
564 552 # build a new Message.
565 553 msg = msg_or_type
566 554 else:
567 555 msg = self.msg(msg_or_type, content=content, parent=parent,
568 556 subheader=subheader, header=header)
569 557
570 558 buffers = [] if buffers is None else buffers
571 559 to_send = self.serialize(msg, ident)
572 560 flag = 0
573 561 if buffers:
574 562 flag = zmq.SNDMORE
575 563 _track = False
576 564 else:
577 565 _track=track
578 566 if track:
579 567 tracker = stream.send_multipart(to_send, flag, copy=False, track=_track)
580 568 else:
581 569 tracker = stream.send_multipart(to_send, flag, copy=False)
582 570 for b in buffers[:-1]:
583 571 stream.send(b, flag, copy=False)
584 572 if buffers:
585 573 if track:
586 574 tracker = stream.send(buffers[-1], copy=False, track=track)
587 575 else:
588 576 tracker = stream.send(buffers[-1], copy=False)
589 577
590 578 # omsg = Message(msg)
591 579 if self.debug:
592 580 pprint.pprint(msg)
593 581 pprint.pprint(to_send)
594 582 pprint.pprint(buffers)
595 583
596 584 msg['tracker'] = tracker
597 585
598 586 return msg
599 587
600 588 def send_raw(self, stream, msg_list, flags=0, copy=True, ident=None):
601 589 """Send a raw message via ident path.
602 590
603 591 This method is used to send a already serialized message.
604 592
605 593 Parameters
606 594 ----------
607 595 stream : ZMQStream or Socket
608 596 The ZMQ stream or socket to use for sending the message.
609 597 msg_list : list
610 598 The serialized list of messages to send. This only includes the
611 599 [p_header,p_parent,p_content,buffer1,buffer2,...] portion of
612 600 the message.
613 601 ident : ident or list
614 602 A single ident or a list of idents to use in sending.
615 603 """
616 604 to_send = []
617 605 if isinstance(ident, bytes):
618 606 ident = [ident]
619 607 if ident is not None:
620 608 to_send.extend(ident)
621 609
622 610 to_send.append(DELIM)
623 611 to_send.append(self.sign(msg_list))
624 612 to_send.extend(msg_list)
625 613 stream.send_multipart(msg_list, flags, copy=copy)
626 614
627 615 def recv(self, socket, mode=zmq.NOBLOCK, content=True, copy=True):
628 616 """Receive and unpack a message.
629 617
630 618 Parameters
631 619 ----------
632 620 socket : ZMQStream or Socket
633 621 The socket or stream to use in receiving.
634 622
635 623 Returns
636 624 -------
637 625 [idents], msg
638 626 [idents] is a list of idents and msg is a nested message dict of
639 627 same format as self.msg returns.
640 628 """
641 629 if isinstance(socket, ZMQStream):
642 630 socket = socket.socket
643 631 try:
644 632 msg_list = socket.recv_multipart(mode)
645 633 except zmq.ZMQError as e:
646 634 if e.errno == zmq.EAGAIN:
647 635 # We can convert EAGAIN to None as we know in this case
648 636 # recv_multipart won't return None.
649 637 return None,None
650 638 else:
651 639 raise
652 640 # split multipart message into identity list and message dict
653 641 # invalid large messages can cause very expensive string comparisons
654 642 idents, msg_list = self.feed_identities(msg_list, copy)
655 643 try:
656 644 return idents, self.unserialize(msg_list, content=content, copy=copy)
657 645 except Exception as e:
658 646 # TODO: handle it
659 647 raise e
660 648
661 649 def feed_identities(self, msg_list, copy=True):
662 650 """Split the identities from the rest of the message.
663 651
664 652 Feed until DELIM is reached, then return the prefix as idents and
665 653 remainder as msg_list. This is easily broken by setting an IDENT to DELIM,
666 654 but that would be silly.
667 655
668 656 Parameters
669 657 ----------
670 658 msg_list : a list of Message or bytes objects
671 659 The message to be split.
672 660 copy : bool
673 661 flag determining whether the arguments are bytes or Messages
674 662
675 663 Returns
676 664 -------
677 665 (idents, msg_list) : two lists
678 666 idents will always be a list of bytes, each of which is a ZMQ
679 667 identity. msg_list will be a list of bytes or zmq.Messages of the
680 668 form [HMAC,p_header,p_parent,p_content,buffer1,buffer2,...] and
681 669 should be unpackable/unserializable via self.unserialize at this
682 670 point.
683 671 """
684 672 if copy:
685 673 idx = msg_list.index(DELIM)
686 674 return msg_list[:idx], msg_list[idx+1:]
687 675 else:
688 676 failed = True
689 677 for idx,m in enumerate(msg_list):
690 678 if m.bytes == DELIM:
691 679 failed = False
692 680 break
693 681 if failed:
694 682 raise ValueError("DELIM not in msg_list")
695 683 idents, msg_list = msg_list[:idx], msg_list[idx+1:]
696 684 return [m.bytes for m in idents], msg_list
697 685
698 686 def unserialize(self, msg_list, content=True, copy=True):
699 687 """Unserialize a msg_list to a nested message dict.
700 688
701 689 This is roughly the inverse of serialize. The serialize/unserialize
702 690 methods work with full message lists, whereas pack/unpack work with
703 691 the individual message parts in the message list.
704 692
705 693 Parameters:
706 694 -----------
707 695 msg_list : list of bytes or Message objects
708 696 The list of message parts of the form [HMAC,p_header,p_parent,
709 697 p_content,buffer1,buffer2,...].
710 698 content : bool (True)
711 699 Whether to unpack the content dict (True), or leave it packed
712 700 (False).
713 701 copy : bool (True)
714 702 Whether to return the bytes (True), or the non-copying Message
715 703 object in each place (False).
716 704
717 705 Returns
718 706 -------
719 707 msg : dict
720 708 The nested message dict with top-level keys [header, parent_header,
721 709 content, buffers].
722 710 """
723 711 minlen = 4
724 712 message = {}
725 713 if not copy:
726 714 for i in range(minlen):
727 715 msg_list[i] = msg_list[i].bytes
728 716 if self.auth is not None:
729 717 signature = msg_list[0]
730 718 if not signature:
731 719 raise ValueError("Unsigned Message")
732 720 if signature in self.digest_history:
733 721 raise ValueError("Duplicate Signature: %r"%signature)
734 722 self.digest_history.add(signature)
735 723 check = self.sign(msg_list[1:4])
736 724 if not signature == check:
737 725 raise ValueError("Invalid Signature: %r"%signature)
738 726 if not len(msg_list) >= minlen:
739 727 raise TypeError("malformed message, must have at least %i elements"%minlen)
740 728 header = self.unpack(msg_list[1])
741 729 message['header'] = header
742 730 message['msg_id'] = header['msg_id']
743 731 message['msg_type'] = header['msg_type']
744 732 message['parent_header'] = self.unpack(msg_list[2])
745 733 if content:
746 734 message['content'] = self.unpack(msg_list[3])
747 735 else:
748 736 message['content'] = msg_list[3]
749 737
750 738 message['buffers'] = msg_list[4:]
751 739 return message
752 740
753 741 def test_msg2obj():
754 742 am = dict(x=1)
755 743 ao = Message(am)
756 744 assert ao.x == am['x']
757 745
758 746 am['y'] = dict(z=1)
759 747 ao = Message(am)
760 748 assert ao.y.z == am['y']['z']
761 749
762 750 k1, k2 = 'y', 'z'
763 751 assert ao[k1][k2] == am[k1][k2]
764 752
765 753 am2 = dict(ao)
766 754 assert am['x'] == am2['x']
767 755 assert am['y']['z'] == am2['y']['z']
768 756
General Comments 0
You need to be logged in to leave comments. Login now