##// END OF EJS Templates
Move protocol_version to core.release
Takafumi Arakaki -
Show More
@@ -1,148 +1,151 b''
1 1 # -*- coding: utf-8 -*-
2 2 """Release data for the IPython project."""
3 3
4 4 #-----------------------------------------------------------------------------
5 5 # Copyright (c) 2008, IPython Development Team.
6 6 # Copyright (c) 2001, Fernando Perez <fernando.perez@colorado.edu>
7 7 # Copyright (c) 2001, Janko Hauser <jhauser@zscout.de>
8 8 # Copyright (c) 2001, Nathaniel Gray <n8gray@caltech.edu>
9 9 #
10 10 # Distributed under the terms of the Modified BSD License.
11 11 #
12 12 # The full license is in the file COPYING.txt, distributed with this software.
13 13 #-----------------------------------------------------------------------------
14 14
15 15 # Name of the package for release purposes. This is the name which labels
16 16 # the tarballs and RPMs made by distutils, so it's best to lowercase it.
17 17 name = 'ipython'
18 18
19 19 # IPython version information. An empty _version_extra corresponds to a full
20 20 # release. 'dev' as a _version_extra string means this is a development
21 21 # version
22 22 _version_major = 0
23 23 _version_minor = 14
24 24 _version_micro = 0 # use 0 for first of series, number for 1 and above
25 25 _version_extra = 'dev'
26 26 #_version_extra = 'rc1'
27 27 # _version_extra = '' # Uncomment this for full releases
28 28
29 29 # Construct full version string from these.
30 30 _ver = [_version_major, _version_minor]
31 31 if _version_micro:
32 32 _ver.append(_version_micro)
33 33 if _version_extra:
34 34 _ver.append(_version_extra)
35 35
36 36 __version__ = '.'.join(map(str, _ver))
37 37
38 38 version = __version__ # backwards compatibility name
39 39 version_info = (_version_major, _version_minor, _version_micro, _version_extra)
40 40
41 # Change this when incrementing the kernel protocol version
42 kernel_protocol_version_info = (4, 0)
43
41 44 description = "IPython: Productive Interactive Computing"
42 45
43 46 long_description = \
44 47 """
45 48 IPython provides a rich toolkit to help you make the most out of using Python
46 49 interactively. Its main components are:
47 50
48 51 * Powerful interactive Python shells (terminal- and Qt-based).
49 52 * A web-based interactive notebook environment with all shell features plus
50 53 support for embedded figures, animations and rich media.
51 54 * Support for interactive data visualization and use of GUI toolkits.
52 55 * Flexible, embeddable interpreters to load into your own projects.
53 56 * A high-performance library for high level and interactive parallel computing
54 57 that works in multicore systems, clusters, supercomputing and cloud scenarios.
55 58
56 59 The enhanced interactive Python shells have the following main features:
57 60
58 61 * Comprehensive object introspection.
59 62
60 63 * Input history, persistent across sessions.
61 64
62 65 * Caching of output results during a session with automatically generated
63 66 references.
64 67
65 68 * Extensible tab completion, with support by default for completion of python
66 69 variables and keywords, filenames and function keywords.
67 70
68 71 * Extensible system of 'magic' commands for controlling the environment and
69 72 performing many tasks related either to IPython or the operating system.
70 73
71 74 * A rich configuration system with easy switching between different setups
72 75 (simpler than changing $PYTHONSTARTUP environment variables every time).
73 76
74 77 * Session logging and reloading.
75 78
76 79 * Extensible syntax processing for special purpose situations.
77 80
78 81 * Access to the system shell with user-extensible alias system.
79 82
80 83 * Easily embeddable in other Python programs and GUIs.
81 84
82 85 * Integrated access to the pdb debugger and the Python profiler.
83 86
84 87 The parallel computing architecture has the following main features:
85 88
86 89 * Quickly parallelize Python code from an interactive Python/IPython session.
87 90
88 91 * A flexible and dynamic process model that be deployed on anything from
89 92 multicore workstations to supercomputers.
90 93
91 94 * An architecture that supports many different styles of parallelism, from
92 95 message passing to task farming.
93 96
94 97 * Both blocking and fully asynchronous interfaces.
95 98
96 99 * High level APIs that enable many things to be parallelized in a few lines
97 100 of code.
98 101
99 102 * Share live parallel jobs with other users securely.
100 103
101 104 * Dynamically load balanced task farming system.
102 105
103 106 * Robust error handling in parallel code.
104 107
105 108 The latest development version is always available from IPython's `GitHub
106 109 site <http://github.com/ipython>`_.
107 110 """
108 111
109 112 license = 'BSD'
110 113
111 114 authors = {'Fernando' : ('Fernando Perez','fperez.net@gmail.com'),
112 115 'Janko' : ('Janko Hauser','jhauser@zscout.de'),
113 116 'Nathan' : ('Nathaniel Gray','n8gray@caltech.edu'),
114 117 'Ville' : ('Ville Vainio','vivainio@gmail.com'),
115 118 'Brian' : ('Brian E Granger', 'ellisonbg@gmail.com'),
116 119 'Min' : ('Min Ragan-Kelley', 'benjaminrk@gmail.com'),
117 120 'Thomas' : ('Thomas A. Kluyver', 'takowl@gmail.com'),
118 121 'Jorgen' : ('Jorgen Stenarson', 'jorgen.stenarson@bostream.nu'),
119 122 'Matthias' : ('Matthias Bussonnier', 'bussonniermatthias@gmail.com'),
120 123 }
121 124
122 125 author = 'The IPython Development Team'
123 126
124 127 author_email = 'ipython-dev@scipy.org'
125 128
126 129 url = 'http://ipython.org'
127 130
128 131 download_url = 'https://github.com/ipython/ipython/downloads'
129 132
130 133 platforms = ['Linux','Mac OSX','Windows XP/2000/NT/Vista/7']
131 134
132 135 keywords = ['Interactive','Interpreter','Shell','Parallel','Distributed',
133 136 'Web-based computing', 'Qt console', 'Embedding']
134 137
135 138 classifiers = [
136 139 'Intended Audience :: Developers',
137 140 'Intended Audience :: Science/Research',
138 141 'License :: OSI Approved :: BSD License',
139 142 'Programming Language :: Python',
140 143 'Programming Language :: Python :: 2',
141 144 'Programming Language :: Python :: 2.6',
142 145 'Programming Language :: Python :: 2.7',
143 146 'Programming Language :: Python :: 3',
144 147 'Programming Language :: Python :: 3.1',
145 148 'Programming Language :: Python :: 3.2',
146 149 'Topic :: System :: Distributed Computing',
147 150 'Topic :: System :: Shells'
148 151 ]
@@ -1,949 +1,950 b''
1 1 #!/usr/bin/env python
2 2 """A simple interactive kernel that talks to a frontend over 0MQ.
3 3
4 4 Things to do:
5 5
6 6 * Implement `set_parent` logic. Right before doing exec, the Kernel should
7 7 call set_parent on all the PUB objects with the message about to be executed.
8 8 * Implement random port and security key logic.
9 9 * Implement control messages.
10 10 * Implement event loop and poll version.
11 11 """
12 12
13 13 #-----------------------------------------------------------------------------
14 14 # Imports
15 15 #-----------------------------------------------------------------------------
16 16 from __future__ import print_function
17 17
18 18 # Standard library imports
19 19 import __builtin__
20 20 import atexit
21 21 import sys
22 22 import time
23 23 import traceback
24 24 import logging
25 25 import uuid
26 26
27 27 from datetime import datetime
28 28 from signal import (
29 29 signal, getsignal, default_int_handler, SIGINT, SIG_IGN
30 30 )
31 31
32 32 # System library imports
33 33 import zmq
34 34 from zmq.eventloop import ioloop
35 35 from zmq.eventloop.zmqstream import ZMQStream
36 36
37 37 # Local imports
38 import IPython
39 38 from IPython.config.configurable import Configurable
40 39 from IPython.config.application import boolean_flag, catch_config_error
41 40 from IPython.core.application import ProfileDir
42 41 from IPython.core.error import StdinNotImplementedError
42 from IPython.core import release
43 43 from IPython.core.shellapp import (
44 44 InteractiveShellApp, shell_flags, shell_aliases
45 45 )
46 46 from IPython.utils import io
47 47 from IPython.utils import py3compat
48 48 from IPython.utils.frame import extract_module_locals
49 49 from IPython.utils.jsonutil import json_clean
50 50 from IPython.utils.traitlets import (
51 51 Any, Instance, Float, Dict, CaselessStrEnum, List, Set, Integer, Unicode
52 52 )
53 53
54 54 from entry_point import base_launch_kernel
55 55 from kernelapp import KernelApp, kernel_flags, kernel_aliases
56 56 from serialize import serialize_object, unpack_apply_message
57 from session import Session, Message, protocol_version
57 from session import Session, Message
58 58 from zmqshell import ZMQInteractiveShell
59 59
60 60
61 61 #-----------------------------------------------------------------------------
62 62 # Main kernel class
63 63 #-----------------------------------------------------------------------------
64 64
65 ipython_version = list(IPython.version_info)
65 protocol_version = list(release.kernel_protocol_version_info)
66 ipython_version = list(release.version_info)
66 67 language_version = list(sys.version_info[:3])
67 68
68 69
69 70 class Kernel(Configurable):
70 71
71 72 #---------------------------------------------------------------------------
72 73 # Kernel interface
73 74 #---------------------------------------------------------------------------
74 75
75 76 # attribute to override with a GUI
76 77 eventloop = Any(None)
77 78 def _eventloop_changed(self, name, old, new):
78 79 """schedule call to eventloop from IOLoop"""
79 80 loop = ioloop.IOLoop.instance()
80 81 loop.add_timeout(time.time()+0.1, self.enter_eventloop)
81 82
82 83 shell = Instance('IPython.core.interactiveshell.InteractiveShellABC')
83 84 session = Instance(Session)
84 85 profile_dir = Instance('IPython.core.profiledir.ProfileDir')
85 86 shell_streams = List()
86 87 control_stream = Instance(ZMQStream)
87 88 iopub_socket = Instance(zmq.Socket)
88 89 stdin_socket = Instance(zmq.Socket)
89 90 log = Instance(logging.Logger)
90 91
91 92 user_module = Any()
92 93 def _user_module_changed(self, name, old, new):
93 94 if self.shell is not None:
94 95 self.shell.user_module = new
95 96
96 97 user_ns = Dict(default_value=None)
97 98 def _user_ns_changed(self, name, old, new):
98 99 if self.shell is not None:
99 100 self.shell.user_ns = new
100 101 self.shell.init_user_ns()
101 102
102 103 # identities:
103 104 int_id = Integer(-1)
104 105 ident = Unicode()
105 106
106 107 def _ident_default(self):
107 108 return unicode(uuid.uuid4())
108 109
109 110
110 111 # Private interface
111 112
112 113 # Time to sleep after flushing the stdout/err buffers in each execute
113 114 # cycle. While this introduces a hard limit on the minimal latency of the
114 115 # execute cycle, it helps prevent output synchronization problems for
115 116 # clients.
116 117 # Units are in seconds. The minimum zmq latency on local host is probably
117 118 # ~150 microseconds, set this to 500us for now. We may need to increase it
118 119 # a little if it's not enough after more interactive testing.
119 120 _execute_sleep = Float(0.0005, config=True)
120 121
121 122 # Frequency of the kernel's event loop.
122 123 # Units are in seconds, kernel subclasses for GUI toolkits may need to
123 124 # adapt to milliseconds.
124 125 _poll_interval = Float(0.05, config=True)
125 126
126 127 # If the shutdown was requested over the network, we leave here the
127 128 # necessary reply message so it can be sent by our registered atexit
128 129 # handler. This ensures that the reply is only sent to clients truly at
129 130 # the end of our shutdown process (which happens after the underlying
130 131 # IPython shell's own shutdown).
131 132 _shutdown_message = None
132 133
133 134 # This is a dict of port number that the kernel is listening on. It is set
134 135 # by record_ports and used by connect_request.
135 136 _recorded_ports = Dict()
136 137
137 138 # set of aborted msg_ids
138 139 aborted = Set()
139 140
140 141
141 142 def __init__(self, **kwargs):
142 143 super(Kernel, self).__init__(**kwargs)
143 144
144 145 # Initialize the InteractiveShell subclass
145 146 self.shell = ZMQInteractiveShell.instance(config=self.config,
146 147 profile_dir = self.profile_dir,
147 148 user_module = self.user_module,
148 149 user_ns = self.user_ns,
149 150 )
150 151 self.shell.displayhook.session = self.session
151 152 self.shell.displayhook.pub_socket = self.iopub_socket
152 153 self.shell.displayhook.topic = self._topic('pyout')
153 154 self.shell.display_pub.session = self.session
154 155 self.shell.display_pub.pub_socket = self.iopub_socket
155 156 self.shell.data_pub.session = self.session
156 157 self.shell.data_pub.pub_socket = self.iopub_socket
157 158
158 159 # TMP - hack while developing
159 160 self.shell._reply_content = None
160 161
161 162 # Build dict of handlers for message types
162 163 msg_types = [ 'execute_request', 'complete_request',
163 164 'object_info_request', 'history_request',
164 165 'kernel_info_request',
165 166 'connect_request', 'shutdown_request',
166 167 'apply_request',
167 168 ]
168 169 self.shell_handlers = {}
169 170 for msg_type in msg_types:
170 171 self.shell_handlers[msg_type] = getattr(self, msg_type)
171 172
172 173 control_msg_types = msg_types + [ 'clear_request', 'abort_request' ]
173 174 self.control_handlers = {}
174 175 for msg_type in control_msg_types:
175 176 self.control_handlers[msg_type] = getattr(self, msg_type)
176 177
177 178 def dispatch_control(self, msg):
178 179 """dispatch control requests"""
179 180 idents,msg = self.session.feed_identities(msg, copy=False)
180 181 try:
181 182 msg = self.session.unserialize(msg, content=True, copy=False)
182 183 except:
183 184 self.log.error("Invalid Control Message", exc_info=True)
184 185 return
185 186
186 187 self.log.debug("Control received: %s", msg)
187 188
188 189 header = msg['header']
189 190 msg_id = header['msg_id']
190 191 msg_type = header['msg_type']
191 192
192 193 handler = self.control_handlers.get(msg_type, None)
193 194 if handler is None:
194 195 self.log.error("UNKNOWN CONTROL MESSAGE TYPE: %r", msg_type)
195 196 else:
196 197 try:
197 198 handler(self.control_stream, idents, msg)
198 199 except Exception:
199 200 self.log.error("Exception in control handler:", exc_info=True)
200 201
201 202 def dispatch_shell(self, stream, msg):
202 203 """dispatch shell requests"""
203 204 # flush control requests first
204 205 if self.control_stream:
205 206 self.control_stream.flush()
206 207
207 208 idents,msg = self.session.feed_identities(msg, copy=False)
208 209 try:
209 210 msg = self.session.unserialize(msg, content=True, copy=False)
210 211 except:
211 212 self.log.error("Invalid Message", exc_info=True)
212 213 return
213 214
214 215 header = msg['header']
215 216 msg_id = header['msg_id']
216 217 msg_type = msg['header']['msg_type']
217 218
218 219 # Print some info about this message and leave a '--->' marker, so it's
219 220 # easier to trace visually the message chain when debugging. Each
220 221 # handler prints its message at the end.
221 222 self.log.debug('\n*** MESSAGE TYPE:%s***', msg_type)
222 223 self.log.debug(' Content: %s\n --->\n ', msg['content'])
223 224
224 225 if msg_id in self.aborted:
225 226 self.aborted.remove(msg_id)
226 227 # is it safe to assume a msg_id will not be resubmitted?
227 228 reply_type = msg_type.split('_')[0] + '_reply'
228 229 status = {'status' : 'aborted'}
229 230 md = {'engine' : self.ident}
230 231 md.update(status)
231 232 reply_msg = self.session.send(stream, reply_type, metadata=md,
232 233 content=status, parent=msg, ident=idents)
233 234 return
234 235
235 236 handler = self.shell_handlers.get(msg_type, None)
236 237 if handler is None:
237 238 self.log.error("UNKNOWN MESSAGE TYPE: %r", msg_type)
238 239 else:
239 240 # ensure default_int_handler during handler call
240 241 sig = signal(SIGINT, default_int_handler)
241 242 try:
242 243 handler(stream, idents, msg)
243 244 except Exception:
244 245 self.log.error("Exception in message handler:", exc_info=True)
245 246 finally:
246 247 signal(SIGINT, sig)
247 248
248 249 def enter_eventloop(self):
249 250 """enter eventloop"""
250 251 self.log.info("entering eventloop")
251 252 # restore default_int_handler
252 253 signal(SIGINT, default_int_handler)
253 254 while self.eventloop is not None:
254 255 try:
255 256 self.eventloop(self)
256 257 except KeyboardInterrupt:
257 258 # Ctrl-C shouldn't crash the kernel
258 259 self.log.error("KeyboardInterrupt caught in kernel")
259 260 continue
260 261 else:
261 262 # eventloop exited cleanly, this means we should stop (right?)
262 263 self.eventloop = None
263 264 break
264 265 self.log.info("exiting eventloop")
265 266
266 267 def start(self):
267 268 """register dispatchers for streams"""
268 269 self.shell.exit_now = False
269 270 if self.control_stream:
270 271 self.control_stream.on_recv(self.dispatch_control, copy=False)
271 272
272 273 def make_dispatcher(stream):
273 274 def dispatcher(msg):
274 275 return self.dispatch_shell(stream, msg)
275 276 return dispatcher
276 277
277 278 for s in self.shell_streams:
278 279 s.on_recv(make_dispatcher(s), copy=False)
279 280
280 281 def do_one_iteration(self):
281 282 """step eventloop just once"""
282 283 if self.control_stream:
283 284 self.control_stream.flush()
284 285 for stream in self.shell_streams:
285 286 # handle at most one request per iteration
286 287 stream.flush(zmq.POLLIN, 1)
287 288 stream.flush(zmq.POLLOUT)
288 289
289 290
290 291 def record_ports(self, ports):
291 292 """Record the ports that this kernel is using.
292 293
293 294 The creator of the Kernel instance must call this methods if they
294 295 want the :meth:`connect_request` method to return the port numbers.
295 296 """
296 297 self._recorded_ports = ports
297 298
298 299 #---------------------------------------------------------------------------
299 300 # Kernel request handlers
300 301 #---------------------------------------------------------------------------
301 302
302 303 def _make_metadata(self, other=None):
303 304 """init metadata dict, for execute/apply_reply"""
304 305 new_md = {
305 306 'dependencies_met' : True,
306 307 'engine' : self.ident,
307 308 'started': datetime.now(),
308 309 }
309 310 if other:
310 311 new_md.update(other)
311 312 return new_md
312 313
313 314 def _publish_pyin(self, code, parent, execution_count):
314 315 """Publish the code request on the pyin stream."""
315 316
316 317 self.session.send(self.iopub_socket, u'pyin',
317 318 {u'code':code, u'execution_count': execution_count},
318 319 parent=parent, ident=self._topic('pyin')
319 320 )
320 321
321 322 def _publish_status(self, status, parent=None):
322 323 """send status (busy/idle) on IOPub"""
323 324 self.session.send(self.iopub_socket,
324 325 u'status',
325 326 {u'execution_state': status},
326 327 parent=parent,
327 328 ident=self._topic('status'),
328 329 )
329 330
330 331
331 332 def execute_request(self, stream, ident, parent):
332 333 """handle an execute_request"""
333 334
334 335 self._publish_status(u'busy', parent)
335 336
336 337 try:
337 338 content = parent[u'content']
338 339 code = content[u'code']
339 340 silent = content[u'silent']
340 341 store_history = content.get(u'store_history', not silent)
341 342 except:
342 343 self.log.error("Got bad msg: ")
343 344 self.log.error("%s", parent)
344 345 return
345 346
346 347 md = self._make_metadata(parent['metadata'])
347 348
348 349 shell = self.shell # we'll need this a lot here
349 350
350 351 # Replace raw_input. Note that is not sufficient to replace
351 352 # raw_input in the user namespace.
352 353 if content.get('allow_stdin', False):
353 354 raw_input = lambda prompt='': self._raw_input(prompt, ident, parent)
354 355 else:
355 356 raw_input = lambda prompt='' : self._no_raw_input()
356 357
357 358 if py3compat.PY3:
358 359 __builtin__.input = raw_input
359 360 else:
360 361 __builtin__.raw_input = raw_input
361 362
362 363 # Set the parent message of the display hook and out streams.
363 364 shell.displayhook.set_parent(parent)
364 365 shell.display_pub.set_parent(parent)
365 366 shell.data_pub.set_parent(parent)
366 367 sys.stdout.set_parent(parent)
367 368 sys.stderr.set_parent(parent)
368 369
369 370 # Re-broadcast our input for the benefit of listening clients, and
370 371 # start computing output
371 372 if not silent:
372 373 self._publish_pyin(code, parent, shell.execution_count)
373 374
374 375 reply_content = {}
375 376 try:
376 377 # FIXME: the shell calls the exception handler itself.
377 378 shell.run_cell(code, store_history=store_history, silent=silent)
378 379 except:
379 380 status = u'error'
380 381 # FIXME: this code right now isn't being used yet by default,
381 382 # because the run_cell() call above directly fires off exception
382 383 # reporting. This code, therefore, is only active in the scenario
383 384 # where runlines itself has an unhandled exception. We need to
384 385 # uniformize this, for all exception construction to come from a
385 386 # single location in the codbase.
386 387 etype, evalue, tb = sys.exc_info()
387 388 tb_list = traceback.format_exception(etype, evalue, tb)
388 389 reply_content.update(shell._showtraceback(etype, evalue, tb_list))
389 390 else:
390 391 status = u'ok'
391 392
392 393 reply_content[u'status'] = status
393 394
394 395 # Return the execution counter so clients can display prompts
395 396 reply_content['execution_count'] = shell.execution_count - 1
396 397
397 398 # FIXME - fish exception info out of shell, possibly left there by
398 399 # runlines. We'll need to clean up this logic later.
399 400 if shell._reply_content is not None:
400 401 reply_content.update(shell._reply_content)
401 402 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method='execute')
402 403 reply_content['engine_info'] = e_info
403 404 # reset after use
404 405 shell._reply_content = None
405 406
406 407 # At this point, we can tell whether the main code execution succeeded
407 408 # or not. If it did, we proceed to evaluate user_variables/expressions
408 409 if reply_content['status'] == 'ok':
409 410 reply_content[u'user_variables'] = \
410 411 shell.user_variables(content.get(u'user_variables', []))
411 412 reply_content[u'user_expressions'] = \
412 413 shell.user_expressions(content.get(u'user_expressions', {}))
413 414 else:
414 415 # If there was an error, don't even try to compute variables or
415 416 # expressions
416 417 reply_content[u'user_variables'] = {}
417 418 reply_content[u'user_expressions'] = {}
418 419
419 420 # Payloads should be retrieved regardless of outcome, so we can both
420 421 # recover partial output (that could have been generated early in a
421 422 # block, before an error) and clear the payload system always.
422 423 reply_content[u'payload'] = shell.payload_manager.read_payload()
423 424 # Be agressive about clearing the payload because we don't want
424 425 # it to sit in memory until the next execute_request comes in.
425 426 shell.payload_manager.clear_payload()
426 427
427 428 # Flush output before sending the reply.
428 429 sys.stdout.flush()
429 430 sys.stderr.flush()
430 431 # FIXME: on rare occasions, the flush doesn't seem to make it to the
431 432 # clients... This seems to mitigate the problem, but we definitely need
432 433 # to better understand what's going on.
433 434 if self._execute_sleep:
434 435 time.sleep(self._execute_sleep)
435 436
436 437 # Send the reply.
437 438 reply_content = json_clean(reply_content)
438 439
439 440 md['status'] = reply_content['status']
440 441 if reply_content['status'] == 'error' and \
441 442 reply_content['ename'] == 'UnmetDependency':
442 443 md['dependencies_met'] = False
443 444
444 445 reply_msg = self.session.send(stream, u'execute_reply',
445 446 reply_content, parent, metadata=md,
446 447 ident=ident)
447 448
448 449 self.log.debug("%s", reply_msg)
449 450
450 451 if not silent and reply_msg['content']['status'] == u'error':
451 452 self._abort_queues()
452 453
453 454 self._publish_status(u'idle', parent)
454 455
455 456 def complete_request(self, stream, ident, parent):
456 457 txt, matches = self._complete(parent)
457 458 matches = {'matches' : matches,
458 459 'matched_text' : txt,
459 460 'status' : 'ok'}
460 461 matches = json_clean(matches)
461 462 completion_msg = self.session.send(stream, 'complete_reply',
462 463 matches, parent, ident)
463 464 self.log.debug("%s", completion_msg)
464 465
465 466 def object_info_request(self, stream, ident, parent):
466 467 content = parent['content']
467 468 object_info = self.shell.object_inspect(content['oname'],
468 469 detail_level = content.get('detail_level', 0)
469 470 )
470 471 # Before we send this object over, we scrub it for JSON usage
471 472 oinfo = json_clean(object_info)
472 473 msg = self.session.send(stream, 'object_info_reply',
473 474 oinfo, parent, ident)
474 475 self.log.debug("%s", msg)
475 476
476 477 def history_request(self, stream, ident, parent):
477 478 # We need to pull these out, as passing **kwargs doesn't work with
478 479 # unicode keys before Python 2.6.5.
479 480 hist_access_type = parent['content']['hist_access_type']
480 481 raw = parent['content']['raw']
481 482 output = parent['content']['output']
482 483 if hist_access_type == 'tail':
483 484 n = parent['content']['n']
484 485 hist = self.shell.history_manager.get_tail(n, raw=raw, output=output,
485 486 include_latest=True)
486 487
487 488 elif hist_access_type == 'range':
488 489 session = parent['content']['session']
489 490 start = parent['content']['start']
490 491 stop = parent['content']['stop']
491 492 hist = self.shell.history_manager.get_range(session, start, stop,
492 493 raw=raw, output=output)
493 494
494 495 elif hist_access_type == 'search':
495 496 n = parent['content'].get('n')
496 497 pattern = parent['content']['pattern']
497 498 hist = self.shell.history_manager.search(pattern, raw=raw,
498 499 output=output, n=n)
499 500
500 501 else:
501 502 hist = []
502 503 hist = list(hist)
503 504 content = {'history' : hist}
504 505 content = json_clean(content)
505 506 msg = self.session.send(stream, 'history_reply',
506 507 content, parent, ident)
507 508 self.log.debug("Sending history reply with %i entries", len(hist))
508 509
509 510 def connect_request(self, stream, ident, parent):
510 511 if self._recorded_ports is not None:
511 512 content = self._recorded_ports.copy()
512 513 else:
513 514 content = {}
514 515 msg = self.session.send(stream, 'connect_reply',
515 516 content, parent, ident)
516 517 self.log.debug("%s", msg)
517 518
518 519 def kernel_info_request(self, stream, ident, parent):
519 520 vinfo = {
520 521 'protocol_version': protocol_version,
521 522 'ipython_version': ipython_version,
522 523 'language_version': language_version,
523 524 'language': 'python',
524 525 }
525 526 msg = self.session.send(stream, 'kernel_info_reply',
526 527 vinfo, parent, ident)
527 528 self.log.debug("%s", msg)
528 529
529 530 def shutdown_request(self, stream, ident, parent):
530 531 self.shell.exit_now = True
531 532 content = dict(status='ok')
532 533 content.update(parent['content'])
533 534 self.session.send(stream, u'shutdown_reply', content, parent, ident=ident)
534 535 # same content, but different msg_id for broadcasting on IOPub
535 536 self._shutdown_message = self.session.msg(u'shutdown_reply',
536 537 content, parent
537 538 )
538 539
539 540 self._at_shutdown()
540 541 # call sys.exit after a short delay
541 542 loop = ioloop.IOLoop.instance()
542 543 loop.add_timeout(time.time()+0.1, loop.stop)
543 544
544 545 #---------------------------------------------------------------------------
545 546 # Engine methods
546 547 #---------------------------------------------------------------------------
547 548
548 549 def apply_request(self, stream, ident, parent):
549 550 try:
550 551 content = parent[u'content']
551 552 bufs = parent[u'buffers']
552 553 msg_id = parent['header']['msg_id']
553 554 except:
554 555 self.log.error("Got bad msg: %s", parent, exc_info=True)
555 556 return
556 557
557 558 self._publish_status(u'busy', parent)
558 559
559 560 # Set the parent message of the display hook and out streams.
560 561 shell = self.shell
561 562 shell.displayhook.set_parent(parent)
562 563 shell.display_pub.set_parent(parent)
563 564 shell.data_pub.set_parent(parent)
564 565 sys.stdout.set_parent(parent)
565 566 sys.stderr.set_parent(parent)
566 567
567 568 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
568 569 # self.iopub_socket.send(pyin_msg)
569 570 # self.session.send(self.iopub_socket, u'pyin', {u'code':code},parent=parent)
570 571 md = self._make_metadata(parent['metadata'])
571 572 try:
572 573 working = shell.user_ns
573 574
574 575 prefix = "_"+str(msg_id).replace("-","")+"_"
575 576
576 577 f,args,kwargs = unpack_apply_message(bufs, working, copy=False)
577 578
578 579 fname = getattr(f, '__name__', 'f')
579 580
580 581 fname = prefix+"f"
581 582 argname = prefix+"args"
582 583 kwargname = prefix+"kwargs"
583 584 resultname = prefix+"result"
584 585
585 586 ns = { fname : f, argname : args, kwargname : kwargs , resultname : None }
586 587 # print ns
587 588 working.update(ns)
588 589 code = "%s = %s(*%s,**%s)" % (resultname, fname, argname, kwargname)
589 590 try:
590 591 exec code in shell.user_global_ns, shell.user_ns
591 592 result = working.get(resultname)
592 593 finally:
593 594 for key in ns.iterkeys():
594 595 working.pop(key)
595 596
596 597 result_buf = serialize_object(result,
597 598 buffer_threshold=self.session.buffer_threshold,
598 599 item_threshold=self.session.item_threshold,
599 600 )
600 601
601 602 except:
602 603 # invoke IPython traceback formatting
603 604 shell.showtraceback()
604 605 # FIXME - fish exception info out of shell, possibly left there by
605 606 # run_code. We'll need to clean up this logic later.
606 607 reply_content = {}
607 608 if shell._reply_content is not None:
608 609 reply_content.update(shell._reply_content)
609 610 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method='apply')
610 611 reply_content['engine_info'] = e_info
611 612 # reset after use
612 613 shell._reply_content = None
613 614
614 615 self.session.send(self.iopub_socket, u'pyerr', reply_content, parent=parent,
615 616 ident=self._topic('pyerr'))
616 617 result_buf = []
617 618
618 619 if reply_content['ename'] == 'UnmetDependency':
619 620 md['dependencies_met'] = False
620 621 else:
621 622 reply_content = {'status' : 'ok'}
622 623
623 624 # put 'ok'/'error' status in header, for scheduler introspection:
624 625 md['status'] = reply_content['status']
625 626
626 627 # flush i/o
627 628 sys.stdout.flush()
628 629 sys.stderr.flush()
629 630
630 631 reply_msg = self.session.send(stream, u'apply_reply', reply_content,
631 632 parent=parent, ident=ident,buffers=result_buf, metadata=md)
632 633
633 634 self._publish_status(u'idle', parent)
634 635
635 636 #---------------------------------------------------------------------------
636 637 # Control messages
637 638 #---------------------------------------------------------------------------
638 639
639 640 def abort_request(self, stream, ident, parent):
640 641 """abort a specifig msg by id"""
641 642 msg_ids = parent['content'].get('msg_ids', None)
642 643 if isinstance(msg_ids, basestring):
643 644 msg_ids = [msg_ids]
644 645 if not msg_ids:
645 646 self.abort_queues()
646 647 for mid in msg_ids:
647 648 self.aborted.add(str(mid))
648 649
649 650 content = dict(status='ok')
650 651 reply_msg = self.session.send(stream, 'abort_reply', content=content,
651 652 parent=parent, ident=ident)
652 653 self.log.debug("%s", reply_msg)
653 654
654 655 def clear_request(self, stream, idents, parent):
655 656 """Clear our namespace."""
656 657 self.shell.reset(False)
657 658 msg = self.session.send(stream, 'clear_reply', ident=idents, parent=parent,
658 659 content = dict(status='ok'))
659 660
660 661
661 662 #---------------------------------------------------------------------------
662 663 # Protected interface
663 664 #---------------------------------------------------------------------------
664 665
665 666
666 667 def _wrap_exception(self, method=None):
667 668 # import here, because _wrap_exception is only used in parallel,
668 669 # and parallel has higher min pyzmq version
669 670 from IPython.parallel.error import wrap_exception
670 671 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method=method)
671 672 content = wrap_exception(e_info)
672 673 return content
673 674
674 675 def _topic(self, topic):
675 676 """prefixed topic for IOPub messages"""
676 677 if self.int_id >= 0:
677 678 base = "engine.%i" % self.int_id
678 679 else:
679 680 base = "kernel.%s" % self.ident
680 681
681 682 return py3compat.cast_bytes("%s.%s" % (base, topic))
682 683
683 684 def _abort_queues(self):
684 685 for stream in self.shell_streams:
685 686 if stream:
686 687 self._abort_queue(stream)
687 688
688 689 def _abort_queue(self, stream):
689 690 poller = zmq.Poller()
690 691 poller.register(stream.socket, zmq.POLLIN)
691 692 while True:
692 693 idents,msg = self.session.recv(stream, zmq.NOBLOCK, content=True)
693 694 if msg is None:
694 695 return
695 696
696 697 self.log.info("Aborting:")
697 698 self.log.info("%s", msg)
698 699 msg_type = msg['header']['msg_type']
699 700 reply_type = msg_type.split('_')[0] + '_reply'
700 701
701 702 status = {'status' : 'aborted'}
702 703 md = {'engine' : self.ident}
703 704 md.update(status)
704 705 reply_msg = self.session.send(stream, reply_type, metadata=md,
705 706 content=status, parent=msg, ident=idents)
706 707 self.log.debug("%s", reply_msg)
707 708 # We need to wait a bit for requests to come in. This can probably
708 709 # be set shorter for true asynchronous clients.
709 710 poller.poll(50)
710 711
711 712
712 713 def _no_raw_input(self):
713 714 """Raise StdinNotImplentedError if active frontend doesn't support
714 715 stdin."""
715 716 raise StdinNotImplementedError("raw_input was called, but this "
716 717 "frontend does not support stdin.")
717 718
718 719 def _raw_input(self, prompt, ident, parent):
719 720 # Flush output before making the request.
720 721 sys.stderr.flush()
721 722 sys.stdout.flush()
722 723
723 724 # Send the input request.
724 725 content = json_clean(dict(prompt=prompt))
725 726 self.session.send(self.stdin_socket, u'input_request', content, parent,
726 727 ident=ident)
727 728
728 729 # Await a response.
729 730 while True:
730 731 try:
731 732 ident, reply = self.session.recv(self.stdin_socket, 0)
732 733 except Exception:
733 734 self.log.warn("Invalid Message:", exc_info=True)
734 735 else:
735 736 break
736 737 try:
737 738 value = reply['content']['value']
738 739 except:
739 740 self.log.error("Got bad raw_input reply: ")
740 741 self.log.error("%s", parent)
741 742 value = ''
742 743 if value == '\x04':
743 744 # EOF
744 745 raise EOFError
745 746 return value
746 747
747 748 def _complete(self, msg):
748 749 c = msg['content']
749 750 try:
750 751 cpos = int(c['cursor_pos'])
751 752 except:
752 753 # If we don't get something that we can convert to an integer, at
753 754 # least attempt the completion guessing the cursor is at the end of
754 755 # the text, if there's any, and otherwise of the line
755 756 cpos = len(c['text'])
756 757 if cpos==0:
757 758 cpos = len(c['line'])
758 759 return self.shell.complete(c['text'], c['line'], cpos)
759 760
760 761 def _object_info(self, context):
761 762 symbol, leftover = self._symbol_from_context(context)
762 763 if symbol is not None and not leftover:
763 764 doc = getattr(symbol, '__doc__', '')
764 765 else:
765 766 doc = ''
766 767 object_info = dict(docstring = doc)
767 768 return object_info
768 769
769 770 def _symbol_from_context(self, context):
770 771 if not context:
771 772 return None, context
772 773
773 774 base_symbol_string = context[0]
774 775 symbol = self.shell.user_ns.get(base_symbol_string, None)
775 776 if symbol is None:
776 777 symbol = __builtin__.__dict__.get(base_symbol_string, None)
777 778 if symbol is None:
778 779 return None, context
779 780
780 781 context = context[1:]
781 782 for i, name in enumerate(context):
782 783 new_symbol = getattr(symbol, name, None)
783 784 if new_symbol is None:
784 785 return symbol, context[i:]
785 786 else:
786 787 symbol = new_symbol
787 788
788 789 return symbol, []
789 790
790 791 def _at_shutdown(self):
791 792 """Actions taken at shutdown by the kernel, called by python's atexit.
792 793 """
793 794 # io.rprint("Kernel at_shutdown") # dbg
794 795 if self._shutdown_message is not None:
795 796 self.session.send(self.iopub_socket, self._shutdown_message, ident=self._topic('shutdown'))
796 797 self.log.debug("%s", self._shutdown_message)
797 798 [ s.flush(zmq.POLLOUT) for s in self.shell_streams ]
798 799
799 800 #-----------------------------------------------------------------------------
800 801 # Aliases and Flags for the IPKernelApp
801 802 #-----------------------------------------------------------------------------
802 803
803 804 flags = dict(kernel_flags)
804 805 flags.update(shell_flags)
805 806
806 807 addflag = lambda *args: flags.update(boolean_flag(*args))
807 808
808 809 flags['pylab'] = (
809 810 {'IPKernelApp' : {'pylab' : 'auto'}},
810 811 """Pre-load matplotlib and numpy for interactive use with
811 812 the default matplotlib backend."""
812 813 )
813 814
814 815 aliases = dict(kernel_aliases)
815 816 aliases.update(shell_aliases)
816 817
817 818 #-----------------------------------------------------------------------------
818 819 # The IPKernelApp class
819 820 #-----------------------------------------------------------------------------
820 821
821 822 class IPKernelApp(KernelApp, InteractiveShellApp):
822 823 name = 'ipkernel'
823 824
824 825 aliases = Dict(aliases)
825 826 flags = Dict(flags)
826 827 classes = [Kernel, ZMQInteractiveShell, ProfileDir, Session]
827 828
828 829 @catch_config_error
829 830 def initialize(self, argv=None):
830 831 super(IPKernelApp, self).initialize(argv)
831 832 self.init_path()
832 833 self.init_shell()
833 834 self.init_gui_pylab()
834 835 self.init_extensions()
835 836 self.init_code()
836 837
837 838 def init_kernel(self):
838 839
839 840 shell_stream = ZMQStream(self.shell_socket)
840 841
841 842 kernel = Kernel(config=self.config, session=self.session,
842 843 shell_streams=[shell_stream],
843 844 iopub_socket=self.iopub_socket,
844 845 stdin_socket=self.stdin_socket,
845 846 log=self.log,
846 847 profile_dir=self.profile_dir,
847 848 )
848 849 self.kernel = kernel
849 850 kernel.record_ports(self.ports)
850 851 shell = kernel.shell
851 852
852 853 def init_gui_pylab(self):
853 854 """Enable GUI event loop integration, taking pylab into account."""
854 855
855 856 # Provide a wrapper for :meth:`InteractiveShellApp.init_gui_pylab`
856 857 # to ensure that any exception is printed straight to stderr.
857 858 # Normally _showtraceback associates the reply with an execution,
858 859 # which means frontends will never draw it, as this exception
859 860 # is not associated with any execute request.
860 861
861 862 shell = self.shell
862 863 _showtraceback = shell._showtraceback
863 864 try:
864 865 # replace pyerr-sending traceback with stderr
865 866 def print_tb(etype, evalue, stb):
866 867 print ("GUI event loop or pylab initialization failed",
867 868 file=io.stderr)
868 869 print (shell.InteractiveTB.stb2text(stb), file=io.stderr)
869 870 shell._showtraceback = print_tb
870 871 InteractiveShellApp.init_gui_pylab(self)
871 872 finally:
872 873 shell._showtraceback = _showtraceback
873 874
874 875 def init_shell(self):
875 876 self.shell = self.kernel.shell
876 877 self.shell.configurables.append(self)
877 878
878 879
879 880 #-----------------------------------------------------------------------------
880 881 # Kernel main and launch functions
881 882 #-----------------------------------------------------------------------------
882 883
883 884 def launch_kernel(*args, **kwargs):
884 885 """Launches a localhost IPython kernel, binding to the specified ports.
885 886
886 887 This function simply calls entry_point.base_launch_kernel with the right
887 888 first command to start an ipkernel. See base_launch_kernel for arguments.
888 889
889 890 Returns
890 891 -------
891 892 A tuple of form:
892 893 (kernel_process, shell_port, iopub_port, stdin_port, hb_port)
893 894 where kernel_process is a Popen object and the ports are integers.
894 895 """
895 896 return base_launch_kernel('from IPython.zmq.ipkernel import main; main()',
896 897 *args, **kwargs)
897 898
898 899
899 900 def embed_kernel(module=None, local_ns=None, **kwargs):
900 901 """Embed and start an IPython kernel in a given scope.
901 902
902 903 Parameters
903 904 ----------
904 905 module : ModuleType, optional
905 906 The module to load into IPython globals (default: caller)
906 907 local_ns : dict, optional
907 908 The namespace to load into IPython user namespace (default: caller)
908 909
909 910 kwargs : various, optional
910 911 Further keyword args are relayed to the KernelApp constructor,
911 912 allowing configuration of the Kernel. Will only have an effect
912 913 on the first embed_kernel call for a given process.
913 914
914 915 """
915 916 # get the app if it exists, or set it up if it doesn't
916 917 if IPKernelApp.initialized():
917 918 app = IPKernelApp.instance()
918 919 else:
919 920 app = IPKernelApp.instance(**kwargs)
920 921 app.initialize([])
921 922 # Undo unnecessary sys module mangling from init_sys_modules.
922 923 # This would not be necessary if we could prevent it
923 924 # in the first place by using a different InteractiveShell
924 925 # subclass, as in the regular embed case.
925 926 main = app.kernel.shell._orig_sys_modules_main_mod
926 927 if main is not None:
927 928 sys.modules[app.kernel.shell._orig_sys_modules_main_name] = main
928 929
929 930 # load the calling scope if not given
930 931 (caller_module, caller_locals) = extract_module_locals(1)
931 932 if module is None:
932 933 module = caller_module
933 934 if local_ns is None:
934 935 local_ns = caller_locals
935 936
936 937 app.kernel.user_module = module
937 938 app.kernel.user_ns = local_ns
938 939 app.shell.set_completer_frame()
939 940 app.start()
940 941
941 942 def main():
942 943 """Run an IPKernel as an application"""
943 944 app = IPKernelApp.instance()
944 945 app.initialize()
945 946 app.start()
946 947
947 948
948 949 if __name__ == '__main__':
949 950 main()
@@ -1,769 +1,766 b''
1 1 """Session object for building, serializing, sending, and receiving messages in
2 2 IPython. The Session object supports serialization, HMAC signatures, and
3 3 metadata on messages.
4 4
5 5 Also defined here are utilities for working with Sessions:
6 6 * A SessionFactory to be used as a base class for configurables that work with
7 7 Sessions.
8 8 * A Message object for convenience that allows attribute-access to the msg dict.
9 9
10 10 Authors:
11 11
12 12 * Min RK
13 13 * Brian Granger
14 14 * Fernando Perez
15 15 """
16 16 #-----------------------------------------------------------------------------
17 17 # Copyright (C) 2010-2011 The IPython Development Team
18 18 #
19 19 # Distributed under the terms of the BSD License. The full license is in
20 20 # the file COPYING, distributed as part of this software.
21 21 #-----------------------------------------------------------------------------
22 22
23 23 #-----------------------------------------------------------------------------
24 24 # Imports
25 25 #-----------------------------------------------------------------------------
26 26
27 27 import hmac
28 28 import logging
29 29 import os
30 30 import pprint
31 31 import uuid
32 32 from datetime import datetime
33 33
34 34 try:
35 35 import cPickle
36 36 pickle = cPickle
37 37 except:
38 38 cPickle = None
39 39 import pickle
40 40
41 41 import zmq
42 42 from zmq.utils import jsonapi
43 43 from zmq.eventloop.ioloop import IOLoop
44 44 from zmq.eventloop.zmqstream import ZMQStream
45 45
46 46 from IPython.config.application import Application, boolean_flag
47 47 from IPython.config.configurable import Configurable, LoggingConfigurable
48 48 from IPython.utils.importstring import import_item
49 49 from IPython.utils.jsonutil import extract_dates, squash_dates, date_default
50 50 from IPython.utils.py3compat import str_to_bytes
51 51 from IPython.utils.traitlets import (CBytes, Unicode, Bool, Any, Instance, Set,
52 52 DottedObjectName, CUnicode, Dict, Integer)
53 53 from IPython.zmq.serialize import MAX_ITEMS, MAX_BYTES
54 54
55 55 #-----------------------------------------------------------------------------
56 56 # utility functions
57 57 #-----------------------------------------------------------------------------
58 58
59 59 def squash_unicode(obj):
60 60 """coerce unicode back to bytestrings."""
61 61 if isinstance(obj,dict):
62 62 for key in obj.keys():
63 63 obj[key] = squash_unicode(obj[key])
64 64 if isinstance(key, unicode):
65 65 obj[squash_unicode(key)] = obj.pop(key)
66 66 elif isinstance(obj, list):
67 67 for i,v in enumerate(obj):
68 68 obj[i] = squash_unicode(v)
69 69 elif isinstance(obj, unicode):
70 70 obj = obj.encode('utf8')
71 71 return obj
72 72
73 73 #-----------------------------------------------------------------------------
74 74 # globals and defaults
75 75 #-----------------------------------------------------------------------------
76 76
77 # Change this when incrementing the kernel protocol version
78 protocol_version = [4, 0]
79
80 77 # ISO8601-ify datetime objects
81 78 json_packer = lambda obj: jsonapi.dumps(obj, default=date_default)
82 79 json_unpacker = lambda s: extract_dates(jsonapi.loads(s))
83 80
84 81 pickle_packer = lambda o: pickle.dumps(o,-1)
85 82 pickle_unpacker = pickle.loads
86 83
87 84 default_packer = json_packer
88 85 default_unpacker = json_unpacker
89 86
90 87 DELIM = b"<IDS|MSG>"
91 88 # singleton dummy tracker, which will always report as done
92 89 DONE = zmq.MessageTracker()
93 90
94 91 #-----------------------------------------------------------------------------
95 92 # Mixin tools for apps that use Sessions
96 93 #-----------------------------------------------------------------------------
97 94
98 95 session_aliases = dict(
99 96 ident = 'Session.session',
100 97 user = 'Session.username',
101 98 keyfile = 'Session.keyfile',
102 99 )
103 100
104 101 session_flags = {
105 102 'secure' : ({'Session' : { 'key' : str_to_bytes(str(uuid.uuid4())),
106 103 'keyfile' : '' }},
107 104 """Use HMAC digests for authentication of messages.
108 105 Setting this flag will generate a new UUID to use as the HMAC key.
109 106 """),
110 107 'no-secure' : ({'Session' : { 'key' : b'', 'keyfile' : '' }},
111 108 """Don't authenticate messages."""),
112 109 }
113 110
114 111 def default_secure(cfg):
115 112 """Set the default behavior for a config environment to be secure.
116 113
117 114 If Session.key/keyfile have not been set, set Session.key to
118 115 a new random UUID.
119 116 """
120 117
121 118 if 'Session' in cfg:
122 119 if 'key' in cfg.Session or 'keyfile' in cfg.Session:
123 120 return
124 121 # key/keyfile not specified, generate new UUID:
125 122 cfg.Session.key = str_to_bytes(str(uuid.uuid4()))
126 123
127 124
128 125 #-----------------------------------------------------------------------------
129 126 # Classes
130 127 #-----------------------------------------------------------------------------
131 128
132 129 class SessionFactory(LoggingConfigurable):
133 130 """The Base class for configurables that have a Session, Context, logger,
134 131 and IOLoop.
135 132 """
136 133
137 134 logname = Unicode('')
138 135 def _logname_changed(self, name, old, new):
139 136 self.log = logging.getLogger(new)
140 137
141 138 # not configurable:
142 139 context = Instance('zmq.Context')
143 140 def _context_default(self):
144 141 return zmq.Context.instance()
145 142
146 143 session = Instance('IPython.zmq.session.Session')
147 144
148 145 loop = Instance('zmq.eventloop.ioloop.IOLoop', allow_none=False)
149 146 def _loop_default(self):
150 147 return IOLoop.instance()
151 148
152 149 def __init__(self, **kwargs):
153 150 super(SessionFactory, self).__init__(**kwargs)
154 151
155 152 if self.session is None:
156 153 # construct the session
157 154 self.session = Session(**kwargs)
158 155
159 156
160 157 class Message(object):
161 158 """A simple message object that maps dict keys to attributes.
162 159
163 160 A Message can be created from a dict and a dict from a Message instance
164 161 simply by calling dict(msg_obj)."""
165 162
166 163 def __init__(self, msg_dict):
167 164 dct = self.__dict__
168 165 for k, v in dict(msg_dict).iteritems():
169 166 if isinstance(v, dict):
170 167 v = Message(v)
171 168 dct[k] = v
172 169
173 170 # Having this iterator lets dict(msg_obj) work out of the box.
174 171 def __iter__(self):
175 172 return iter(self.__dict__.iteritems())
176 173
177 174 def __repr__(self):
178 175 return repr(self.__dict__)
179 176
180 177 def __str__(self):
181 178 return pprint.pformat(self.__dict__)
182 179
183 180 def __contains__(self, k):
184 181 return k in self.__dict__
185 182
186 183 def __getitem__(self, k):
187 184 return self.__dict__[k]
188 185
189 186
190 187 def msg_header(msg_id, msg_type, username, session):
191 188 date = datetime.now()
192 189 return locals()
193 190
194 191 def extract_header(msg_or_header):
195 192 """Given a message or header, return the header."""
196 193 if not msg_or_header:
197 194 return {}
198 195 try:
199 196 # See if msg_or_header is the entire message.
200 197 h = msg_or_header['header']
201 198 except KeyError:
202 199 try:
203 200 # See if msg_or_header is just the header
204 201 h = msg_or_header['msg_id']
205 202 except KeyError:
206 203 raise
207 204 else:
208 205 h = msg_or_header
209 206 if not isinstance(h, dict):
210 207 h = dict(h)
211 208 return h
212 209
213 210 class Session(Configurable):
214 211 """Object for handling serialization and sending of messages.
215 212
216 213 The Session object handles building messages and sending them
217 214 with ZMQ sockets or ZMQStream objects. Objects can communicate with each
218 215 other over the network via Session objects, and only need to work with the
219 216 dict-based IPython message spec. The Session will handle
220 217 serialization/deserialization, security, and metadata.
221 218
222 219 Sessions support configurable serialiization via packer/unpacker traits,
223 220 and signing with HMAC digests via the key/keyfile traits.
224 221
225 222 Parameters
226 223 ----------
227 224
228 225 debug : bool
229 226 whether to trigger extra debugging statements
230 227 packer/unpacker : str : 'json', 'pickle' or import_string
231 228 importstrings for methods to serialize message parts. If just
232 229 'json' or 'pickle', predefined JSON and pickle packers will be used.
233 230 Otherwise, the entire importstring must be used.
234 231
235 232 The functions must accept at least valid JSON input, and output *bytes*.
236 233
237 234 For example, to use msgpack:
238 235 packer = 'msgpack.packb', unpacker='msgpack.unpackb'
239 236 pack/unpack : callables
240 237 You can also set the pack/unpack callables for serialization directly.
241 238 session : bytes
242 239 the ID of this Session object. The default is to generate a new UUID.
243 240 username : unicode
244 241 username added to message headers. The default is to ask the OS.
245 242 key : bytes
246 243 The key used to initialize an HMAC signature. If unset, messages
247 244 will not be signed or checked.
248 245 keyfile : filepath
249 246 The file containing a key. If this is set, `key` will be initialized
250 247 to the contents of the file.
251 248
252 249 """
253 250
254 251 debug=Bool(False, config=True, help="""Debug output in the Session""")
255 252
256 253 packer = DottedObjectName('json',config=True,
257 254 help="""The name of the packer for serializing messages.
258 255 Should be one of 'json', 'pickle', or an import name
259 256 for a custom callable serializer.""")
260 257 def _packer_changed(self, name, old, new):
261 258 if new.lower() == 'json':
262 259 self.pack = json_packer
263 260 self.unpack = json_unpacker
264 261 self.unpacker = new
265 262 elif new.lower() == 'pickle':
266 263 self.pack = pickle_packer
267 264 self.unpack = pickle_unpacker
268 265 self.unpacker = new
269 266 else:
270 267 self.pack = import_item(str(new))
271 268
272 269 unpacker = DottedObjectName('json', config=True,
273 270 help="""The name of the unpacker for unserializing messages.
274 271 Only used with custom functions for `packer`.""")
275 272 def _unpacker_changed(self, name, old, new):
276 273 if new.lower() == 'json':
277 274 self.pack = json_packer
278 275 self.unpack = json_unpacker
279 276 self.packer = new
280 277 elif new.lower() == 'pickle':
281 278 self.pack = pickle_packer
282 279 self.unpack = pickle_unpacker
283 280 self.packer = new
284 281 else:
285 282 self.unpack = import_item(str(new))
286 283
287 284 session = CUnicode(u'', config=True,
288 285 help="""The UUID identifying this session.""")
289 286 def _session_default(self):
290 287 u = unicode(uuid.uuid4())
291 288 self.bsession = u.encode('ascii')
292 289 return u
293 290
294 291 def _session_changed(self, name, old, new):
295 292 self.bsession = self.session.encode('ascii')
296 293
297 294 # bsession is the session as bytes
298 295 bsession = CBytes(b'')
299 296
300 297 username = Unicode(os.environ.get('USER',u'username'), config=True,
301 298 help="""Username for the Session. Default is your system username.""")
302 299
303 300 metadata = Dict({}, config=True,
304 301 help="""Metadata dictionary, which serves as the default top-level metadata dict for each message.""")
305 302
306 303 # message signature related traits:
307 304
308 305 key = CBytes(b'', config=True,
309 306 help="""execution key, for extra authentication.""")
310 307 def _key_changed(self, name, old, new):
311 308 if new:
312 309 self.auth = hmac.HMAC(new)
313 310 else:
314 311 self.auth = None
315 312 auth = Instance(hmac.HMAC)
316 313 digest_history = Set()
317 314
318 315 keyfile = Unicode('', config=True,
319 316 help="""path to file containing execution key.""")
320 317 def _keyfile_changed(self, name, old, new):
321 318 with open(new, 'rb') as f:
322 319 self.key = f.read().strip()
323 320
324 321 # serialization traits:
325 322
326 323 pack = Any(default_packer) # the actual packer function
327 324 def _pack_changed(self, name, old, new):
328 325 if not callable(new):
329 326 raise TypeError("packer must be callable, not %s"%type(new))
330 327
331 328 unpack = Any(default_unpacker) # the actual packer function
332 329 def _unpack_changed(self, name, old, new):
333 330 # unpacker is not checked - it is assumed to be
334 331 if not callable(new):
335 332 raise TypeError("unpacker must be callable, not %s"%type(new))
336 333
337 334 # thresholds:
338 335 copy_threshold = Integer(2**16, config=True,
339 336 help="Threshold (in bytes) beyond which a buffer should be sent without copying.")
340 337 buffer_threshold = Integer(MAX_BYTES, config=True,
341 338 help="Threshold (in bytes) beyond which an object's buffer should be extracted to avoid pickling.")
342 339 item_threshold = Integer(MAX_ITEMS, config=True,
343 340 help="""The maximum number of items for a container to be introspected for custom serialization.
344 341 Containers larger than this are pickled outright.
345 342 """
346 343 )
347 344
348 345 def __init__(self, **kwargs):
349 346 """create a Session object
350 347
351 348 Parameters
352 349 ----------
353 350
354 351 debug : bool
355 352 whether to trigger extra debugging statements
356 353 packer/unpacker : str : 'json', 'pickle' or import_string
357 354 importstrings for methods to serialize message parts. If just
358 355 'json' or 'pickle', predefined JSON and pickle packers will be used.
359 356 Otherwise, the entire importstring must be used.
360 357
361 358 The functions must accept at least valid JSON input, and output
362 359 *bytes*.
363 360
364 361 For example, to use msgpack:
365 362 packer = 'msgpack.packb', unpacker='msgpack.unpackb'
366 363 pack/unpack : callables
367 364 You can also set the pack/unpack callables for serialization
368 365 directly.
369 366 session : unicode (must be ascii)
370 367 the ID of this Session object. The default is to generate a new
371 368 UUID.
372 369 bsession : bytes
373 370 The session as bytes
374 371 username : unicode
375 372 username added to message headers. The default is to ask the OS.
376 373 key : bytes
377 374 The key used to initialize an HMAC signature. If unset, messages
378 375 will not be signed or checked.
379 376 keyfile : filepath
380 377 The file containing a key. If this is set, `key` will be
381 378 initialized to the contents of the file.
382 379 """
383 380 super(Session, self).__init__(**kwargs)
384 381 self._check_packers()
385 382 self.none = self.pack({})
386 383 # ensure self._session_default() if necessary, so bsession is defined:
387 384 self.session
388 385
389 386 @property
390 387 def msg_id(self):
391 388 """always return new uuid"""
392 389 return str(uuid.uuid4())
393 390
394 391 def _check_packers(self):
395 392 """check packers for binary data and datetime support."""
396 393 pack = self.pack
397 394 unpack = self.unpack
398 395
399 396 # check simple serialization
400 397 msg = dict(a=[1,'hi'])
401 398 try:
402 399 packed = pack(msg)
403 400 except Exception:
404 401 raise ValueError("packer could not serialize a simple message")
405 402
406 403 # ensure packed message is bytes
407 404 if not isinstance(packed, bytes):
408 405 raise ValueError("message packed to %r, but bytes are required"%type(packed))
409 406
410 407 # check that unpack is pack's inverse
411 408 try:
412 409 unpacked = unpack(packed)
413 410 except Exception:
414 411 raise ValueError("unpacker could not handle the packer's output")
415 412
416 413 # check datetime support
417 414 msg = dict(t=datetime.now())
418 415 try:
419 416 unpacked = unpack(pack(msg))
420 417 except Exception:
421 418 self.pack = lambda o: pack(squash_dates(o))
422 419 self.unpack = lambda s: extract_dates(unpack(s))
423 420
424 421 def msg_header(self, msg_type):
425 422 return msg_header(self.msg_id, msg_type, self.username, self.session)
426 423
427 424 def msg(self, msg_type, content=None, parent=None, header=None, metadata=None):
428 425 """Return the nested message dict.
429 426
430 427 This format is different from what is sent over the wire. The
431 428 serialize/unserialize methods converts this nested message dict to the wire
432 429 format, which is a list of message parts.
433 430 """
434 431 msg = {}
435 432 header = self.msg_header(msg_type) if header is None else header
436 433 msg['header'] = header
437 434 msg['msg_id'] = header['msg_id']
438 435 msg['msg_type'] = header['msg_type']
439 436 msg['parent_header'] = {} if parent is None else extract_header(parent)
440 437 msg['content'] = {} if content is None else content
441 438 msg['metadata'] = self.metadata.copy()
442 439 if metadata is not None:
443 440 msg['metadata'].update(metadata)
444 441 return msg
445 442
446 443 def sign(self, msg_list):
447 444 """Sign a message with HMAC digest. If no auth, return b''.
448 445
449 446 Parameters
450 447 ----------
451 448 msg_list : list
452 449 The [p_header,p_parent,p_content] part of the message list.
453 450 """
454 451 if self.auth is None:
455 452 return b''
456 453 h = self.auth.copy()
457 454 for m in msg_list:
458 455 h.update(m)
459 456 return str_to_bytes(h.hexdigest())
460 457
461 458 def serialize(self, msg, ident=None):
462 459 """Serialize the message components to bytes.
463 460
464 461 This is roughly the inverse of unserialize. The serialize/unserialize
465 462 methods work with full message lists, whereas pack/unpack work with
466 463 the individual message parts in the message list.
467 464
468 465 Parameters
469 466 ----------
470 467 msg : dict or Message
471 468 The nexted message dict as returned by the self.msg method.
472 469
473 470 Returns
474 471 -------
475 472 msg_list : list
476 473 The list of bytes objects to be sent with the format:
477 474 [ident1,ident2,...,DELIM,HMAC,p_header,p_parent,p_metadata,p_content,
478 475 buffer1,buffer2,...]. In this list, the p_* entities are
479 476 the packed or serialized versions, so if JSON is used, these
480 477 are utf8 encoded JSON strings.
481 478 """
482 479 content = msg.get('content', {})
483 480 if content is None:
484 481 content = self.none
485 482 elif isinstance(content, dict):
486 483 content = self.pack(content)
487 484 elif isinstance(content, bytes):
488 485 # content is already packed, as in a relayed message
489 486 pass
490 487 elif isinstance(content, unicode):
491 488 # should be bytes, but JSON often spits out unicode
492 489 content = content.encode('utf8')
493 490 else:
494 491 raise TypeError("Content incorrect type: %s"%type(content))
495 492
496 493 real_message = [self.pack(msg['header']),
497 494 self.pack(msg['parent_header']),
498 495 self.pack(msg['metadata']),
499 496 content,
500 497 ]
501 498
502 499 to_send = []
503 500
504 501 if isinstance(ident, list):
505 502 # accept list of idents
506 503 to_send.extend(ident)
507 504 elif ident is not None:
508 505 to_send.append(ident)
509 506 to_send.append(DELIM)
510 507
511 508 signature = self.sign(real_message)
512 509 to_send.append(signature)
513 510
514 511 to_send.extend(real_message)
515 512
516 513 return to_send
517 514
518 515 def send(self, stream, msg_or_type, content=None, parent=None, ident=None,
519 516 buffers=None, track=False, header=None, metadata=None):
520 517 """Build and send a message via stream or socket.
521 518
522 519 The message format used by this function internally is as follows:
523 520
524 521 [ident1,ident2,...,DELIM,HMAC,p_header,p_parent,p_content,
525 522 buffer1,buffer2,...]
526 523
527 524 The serialize/unserialize methods convert the nested message dict into this
528 525 format.
529 526
530 527 Parameters
531 528 ----------
532 529
533 530 stream : zmq.Socket or ZMQStream
534 531 The socket-like object used to send the data.
535 532 msg_or_type : str or Message/dict
536 533 Normally, msg_or_type will be a msg_type unless a message is being
537 534 sent more than once. If a header is supplied, this can be set to
538 535 None and the msg_type will be pulled from the header.
539 536
540 537 content : dict or None
541 538 The content of the message (ignored if msg_or_type is a message).
542 539 header : dict or None
543 540 The header dict for the message (ignored if msg_to_type is a message).
544 541 parent : Message or dict or None
545 542 The parent or parent header describing the parent of this message
546 543 (ignored if msg_or_type is a message).
547 544 ident : bytes or list of bytes
548 545 The zmq.IDENTITY routing path.
549 546 metadata : dict or None
550 547 The metadata describing the message
551 548 buffers : list or None
552 549 The already-serialized buffers to be appended to the message.
553 550 track : bool
554 551 Whether to track. Only for use with Sockets, because ZMQStream
555 552 objects cannot track messages.
556 553
557 554
558 555 Returns
559 556 -------
560 557 msg : dict
561 558 The constructed message.
562 559 """
563 560
564 561 if not isinstance(stream, (zmq.Socket, ZMQStream)):
565 562 raise TypeError("stream must be Socket or ZMQStream, not %r"%type(stream))
566 563 elif track and isinstance(stream, ZMQStream):
567 564 raise TypeError("ZMQStream cannot track messages")
568 565
569 566 if isinstance(msg_or_type, (Message, dict)):
570 567 # We got a Message or message dict, not a msg_type so don't
571 568 # build a new Message.
572 569 msg = msg_or_type
573 570 else:
574 571 msg = self.msg(msg_or_type, content=content, parent=parent,
575 572 header=header, metadata=metadata)
576 573
577 574 buffers = [] if buffers is None else buffers
578 575 to_send = self.serialize(msg, ident)
579 576 to_send.extend(buffers)
580 577 longest = max([ len(s) for s in to_send ])
581 578 copy = (longest < self.copy_threshold)
582 579
583 580 if buffers and track and not copy:
584 581 # only really track when we are doing zero-copy buffers
585 582 tracker = stream.send_multipart(to_send, copy=False, track=True)
586 583 else:
587 584 # use dummy tracker, which will be done immediately
588 585 tracker = DONE
589 586 stream.send_multipart(to_send, copy=copy)
590 587
591 588 if self.debug:
592 589 pprint.pprint(msg)
593 590 pprint.pprint(to_send)
594 591 pprint.pprint(buffers)
595 592
596 593 msg['tracker'] = tracker
597 594
598 595 return msg
599 596
600 597 def send_raw(self, stream, msg_list, flags=0, copy=True, ident=None):
601 598 """Send a raw message via ident path.
602 599
603 600 This method is used to send a already serialized message.
604 601
605 602 Parameters
606 603 ----------
607 604 stream : ZMQStream or Socket
608 605 The ZMQ stream or socket to use for sending the message.
609 606 msg_list : list
610 607 The serialized list of messages to send. This only includes the
611 608 [p_header,p_parent,p_metadata,p_content,buffer1,buffer2,...] portion of
612 609 the message.
613 610 ident : ident or list
614 611 A single ident or a list of idents to use in sending.
615 612 """
616 613 to_send = []
617 614 if isinstance(ident, bytes):
618 615 ident = [ident]
619 616 if ident is not None:
620 617 to_send.extend(ident)
621 618
622 619 to_send.append(DELIM)
623 620 to_send.append(self.sign(msg_list))
624 621 to_send.extend(msg_list)
625 622 stream.send_multipart(msg_list, flags, copy=copy)
626 623
627 624 def recv(self, socket, mode=zmq.NOBLOCK, content=True, copy=True):
628 625 """Receive and unpack a message.
629 626
630 627 Parameters
631 628 ----------
632 629 socket : ZMQStream or Socket
633 630 The socket or stream to use in receiving.
634 631
635 632 Returns
636 633 -------
637 634 [idents], msg
638 635 [idents] is a list of idents and msg is a nested message dict of
639 636 same format as self.msg returns.
640 637 """
641 638 if isinstance(socket, ZMQStream):
642 639 socket = socket.socket
643 640 try:
644 641 msg_list = socket.recv_multipart(mode, copy=copy)
645 642 except zmq.ZMQError as e:
646 643 if e.errno == zmq.EAGAIN:
647 644 # We can convert EAGAIN to None as we know in this case
648 645 # recv_multipart won't return None.
649 646 return None,None
650 647 else:
651 648 raise
652 649 # split multipart message into identity list and message dict
653 650 # invalid large messages can cause very expensive string comparisons
654 651 idents, msg_list = self.feed_identities(msg_list, copy)
655 652 try:
656 653 return idents, self.unserialize(msg_list, content=content, copy=copy)
657 654 except Exception as e:
658 655 # TODO: handle it
659 656 raise e
660 657
661 658 def feed_identities(self, msg_list, copy=True):
662 659 """Split the identities from the rest of the message.
663 660
664 661 Feed until DELIM is reached, then return the prefix as idents and
665 662 remainder as msg_list. This is easily broken by setting an IDENT to DELIM,
666 663 but that would be silly.
667 664
668 665 Parameters
669 666 ----------
670 667 msg_list : a list of Message or bytes objects
671 668 The message to be split.
672 669 copy : bool
673 670 flag determining whether the arguments are bytes or Messages
674 671
675 672 Returns
676 673 -------
677 674 (idents, msg_list) : two lists
678 675 idents will always be a list of bytes, each of which is a ZMQ
679 676 identity. msg_list will be a list of bytes or zmq.Messages of the
680 677 form [HMAC,p_header,p_parent,p_content,buffer1,buffer2,...] and
681 678 should be unpackable/unserializable via self.unserialize at this
682 679 point.
683 680 """
684 681 if copy:
685 682 idx = msg_list.index(DELIM)
686 683 return msg_list[:idx], msg_list[idx+1:]
687 684 else:
688 685 failed = True
689 686 for idx,m in enumerate(msg_list):
690 687 if m.bytes == DELIM:
691 688 failed = False
692 689 break
693 690 if failed:
694 691 raise ValueError("DELIM not in msg_list")
695 692 idents, msg_list = msg_list[:idx], msg_list[idx+1:]
696 693 return [m.bytes for m in idents], msg_list
697 694
698 695 def unserialize(self, msg_list, content=True, copy=True):
699 696 """Unserialize a msg_list to a nested message dict.
700 697
701 698 This is roughly the inverse of serialize. The serialize/unserialize
702 699 methods work with full message lists, whereas pack/unpack work with
703 700 the individual message parts in the message list.
704 701
705 702 Parameters:
706 703 -----------
707 704 msg_list : list of bytes or Message objects
708 705 The list of message parts of the form [HMAC,p_header,p_parent,
709 706 p_metadata,p_content,buffer1,buffer2,...].
710 707 content : bool (True)
711 708 Whether to unpack the content dict (True), or leave it packed
712 709 (False).
713 710 copy : bool (True)
714 711 Whether to return the bytes (True), or the non-copying Message
715 712 object in each place (False).
716 713
717 714 Returns
718 715 -------
719 716 msg : dict
720 717 The nested message dict with top-level keys [header, parent_header,
721 718 content, buffers].
722 719 """
723 720 minlen = 5
724 721 message = {}
725 722 if not copy:
726 723 for i in range(minlen):
727 724 msg_list[i] = msg_list[i].bytes
728 725 if self.auth is not None:
729 726 signature = msg_list[0]
730 727 if not signature:
731 728 raise ValueError("Unsigned Message")
732 729 if signature in self.digest_history:
733 730 raise ValueError("Duplicate Signature: %r"%signature)
734 731 self.digest_history.add(signature)
735 732 check = self.sign(msg_list[1:5])
736 733 if not signature == check:
737 734 raise ValueError("Invalid Signature: %r" % signature)
738 735 if not len(msg_list) >= minlen:
739 736 raise TypeError("malformed message, must have at least %i elements"%minlen)
740 737 header = self.unpack(msg_list[1])
741 738 message['header'] = header
742 739 message['msg_id'] = header['msg_id']
743 740 message['msg_type'] = header['msg_type']
744 741 message['parent_header'] = self.unpack(msg_list[2])
745 742 message['metadata'] = self.unpack(msg_list[3])
746 743 if content:
747 744 message['content'] = self.unpack(msg_list[4])
748 745 else:
749 746 message['content'] = msg_list[4]
750 747
751 748 message['buffers'] = msg_list[5:]
752 749 return message
753 750
754 751 def test_msg2obj():
755 752 am = dict(x=1)
756 753 ao = Message(am)
757 754 assert ao.x == am['x']
758 755
759 756 am['y'] = dict(z=1)
760 757 ao = Message(am)
761 758 assert ao.y.z == am['y']['z']
762 759
763 760 k1, k2 = 'y', 'z'
764 761 assert ao[k1][k2] == am[k1][k2]
765 762
766 763 am2 = dict(ao)
767 764 assert am['x'] == am2['x']
768 765 assert am['y']['z'] == am2['y']['z']
769 766
General Comments 0
You need to be logged in to leave comments. Login now