##// END OF EJS Templates
update Session object per review...
MinRK -
Show More
@@ -1,538 +1,621 b''
1 1 #!/usr/bin/env python
2 """edited session.py to work with streams, and move msg_type to the header
2 """Session object for building, serializing, sending, and receiving messages in
3 IPython. The Session object supports serialization, HMAC signatures, and
4 metadata on messages.
5
6 Also defined here are utilities for working with Sessions:
7 * A SessionFactory to be used as a base class for configurables that work with
8 Sessions.
9 * A Message object for convenience that allows attribute-access to the msg dict.
3 10 """
4 11 #-----------------------------------------------------------------------------
5 12 # Copyright (C) 2010-2011 The IPython Development Team
6 13 #
7 14 # Distributed under the terms of the BSD License. The full license is in
8 15 # the file COPYING, distributed as part of this software.
9 16 #-----------------------------------------------------------------------------
10 17
11 18 #-----------------------------------------------------------------------------
12 19 # Imports
13 20 #-----------------------------------------------------------------------------
14 21
15 22 import hmac
16 23 import logging
17 24 import os
18 25 import pprint
19 26 import uuid
20 27 from datetime import datetime
21 28
22 29 try:
23 30 import cPickle
24 31 pickle = cPickle
25 32 except:
26 33 cPickle = None
27 34 import pickle
28 35
29 36 import zmq
30 37 from zmq.utils import jsonapi
31 38 from zmq.eventloop.ioloop import IOLoop
32 39 from zmq.eventloop.zmqstream import ZMQStream
33 40
34 from IPython.config.application import Application
35 from IPython.config.configurable import Configurable
41 from IPython.config.configurable import Configurable, LoggingConfigurable
36 42 from IPython.utils.importstring import import_item
37 43 from IPython.utils.jsonutil import extract_dates, squash_dates, date_default
38 44 from IPython.utils.traitlets import CStr, Unicode, Bool, Any, Instance, Set
39 45
40 46 #-----------------------------------------------------------------------------
41 47 # utility functions
42 48 #-----------------------------------------------------------------------------
43 49
44 50 def squash_unicode(obj):
45 51 """coerce unicode back to bytestrings."""
46 52 if isinstance(obj,dict):
47 53 for key in obj.keys():
48 54 obj[key] = squash_unicode(obj[key])
49 55 if isinstance(key, unicode):
50 56 obj[squash_unicode(key)] = obj.pop(key)
51 57 elif isinstance(obj, list):
52 58 for i,v in enumerate(obj):
53 59 obj[i] = squash_unicode(v)
54 60 elif isinstance(obj, unicode):
55 61 obj = obj.encode('utf8')
56 62 return obj
57 63
58 64 #-----------------------------------------------------------------------------
59 65 # globals and defaults
60 66 #-----------------------------------------------------------------------------
61 67 key = 'on_unknown' if jsonapi.jsonmod.__name__ == 'jsonlib' else 'default'
62 68 json_packer = lambda obj: jsonapi.dumps(obj, **{key:date_default})
63 69 json_unpacker = lambda s: squash_unicode(extract_dates(jsonapi.loads(s)))
64 70
65 71 pickle_packer = lambda o: pickle.dumps(o,-1)
66 72 pickle_unpacker = pickle.loads
67 73
68 74 default_packer = json_packer
69 75 default_unpacker = json_unpacker
70 76
71 77
72 78 DELIM="<IDS|MSG>"
73 79
74 80 #-----------------------------------------------------------------------------
75 81 # Classes
76 82 #-----------------------------------------------------------------------------
77 83
78 class SessionFactory(Configurable):
84 class SessionFactory(LoggingConfigurable):
79 85 """The Base class for configurables that have a Session, Context, logger,
80 86 and IOLoop.
81 87 """
82 88
83 log = Instance('logging.Logger')
84 def _log_default(self):
85 return Application.instance().log
86
87 89 logname = Unicode('')
88 90 def _logname_changed(self, name, old, new):
89 91 self.log = logging.getLogger(new)
90 92
91 93 # not configurable:
92 94 context = Instance('zmq.Context')
93 95 def _context_default(self):
94 96 return zmq.Context.instance()
95 97
96 98 session = Instance('IPython.zmq.session.Session')
97 99
98 100 loop = Instance('zmq.eventloop.ioloop.IOLoop', allow_none=False)
99 101 def _loop_default(self):
100 102 return IOLoop.instance()
101 103
102 104 def __init__(self, **kwargs):
103 105 super(SessionFactory, self).__init__(**kwargs)
104 106
105 107 if self.session is None:
106 108 # construct the session
107 109 self.session = Session(**kwargs)
108 110
109 111
110 112 class Message(object):
111 113 """A simple message object that maps dict keys to attributes.
112 114
113 115 A Message can be created from a dict and a dict from a Message instance
114 116 simply by calling dict(msg_obj)."""
115 117
116 118 def __init__(self, msg_dict):
117 119 dct = self.__dict__
118 120 for k, v in dict(msg_dict).iteritems():
119 121 if isinstance(v, dict):
120 122 v = Message(v)
121 123 dct[k] = v
122 124
123 125 # Having this iterator lets dict(msg_obj) work out of the box.
124 126 def __iter__(self):
125 127 return iter(self.__dict__.iteritems())
126 128
127 129 def __repr__(self):
128 130 return repr(self.__dict__)
129 131
130 132 def __str__(self):
131 133 return pprint.pformat(self.__dict__)
132 134
133 135 def __contains__(self, k):
134 136 return k in self.__dict__
135 137
136 138 def __getitem__(self, k):
137 139 return self.__dict__[k]
138 140
139 141
140 142 def msg_header(msg_id, msg_type, username, session):
141 143 date = datetime.now()
142 144 return locals()
143 145
144 146 def extract_header(msg_or_header):
145 147 """Given a message or header, return the header."""
146 148 if not msg_or_header:
147 149 return {}
148 150 try:
149 151 # See if msg_or_header is the entire message.
150 152 h = msg_or_header['header']
151 153 except KeyError:
152 154 try:
153 155 # See if msg_or_header is just the header
154 156 h = msg_or_header['msg_id']
155 157 except KeyError:
156 158 raise
157 159 else:
158 160 h = msg_or_header
159 161 if not isinstance(h, dict):
160 162 h = dict(h)
161 163 return h
162 164
163 165 class Session(Configurable):
164 """tweaked version of IPython.zmq.session.Session, for development in Parallel"""
166 """Object for handling serialization and sending of messages.
167
168 The Session object handles building messages and sending them
169 with ZMQ sockets or ZMQStream objects. Objects can communicate with each
170 other over the network via Session objects, and only need to work with the
171 dict-based IPython message spec. The Session will handle
172 serialization/deserialization, security, and metadata.
173
174 Sessions support configurable serialiization via packer/unpacker traits,
175 and signing with HMAC digests via the key/keyfile traits.
176
177 Parameters
178 ----------
179
180 debug : bool
181 whether to trigger extra debugging statements
182 packer/unpacker : str : 'json', 'pickle' or import_string
183 importstrings for methods to serialize message parts. If just
184 'json' or 'pickle', predefined JSON and pickle packers will be used.
185 Otherwise, the entire importstring must be used.
186
187 The functions must accept at least valid JSON input, and output *bytes*.
188
189 For example, to use msgpack:
190 packer = 'msgpack.packb', unpacker='msgpack.unpackb'
191 pack/unpack : callables
192 You can also set the pack/unpack callables for serialization directly.
193 session : bytes
194 the ID of this Session object. The default is to generate a new UUID.
195 username : unicode
196 username added to message headers. The default is to ask the OS.
197 key : bytes
198 The key used to initialize an HMAC signature. If unset, messages
199 will not be signed or checked.
200 keyfile : filepath
201 The file containing a key. If this is set, `key` will be initialized
202 to the contents of the file.
203
204 """
205
165 206 debug=Bool(False, config=True, help="""Debug output in the Session""")
207
166 208 packer = Unicode('json',config=True,
167 209 help="""The name of the packer for serializing messages.
168 210 Should be one of 'json', 'pickle', or an import name
169 211 for a custom callable serializer.""")
170 212 def _packer_changed(self, name, old, new):
171 213 if new.lower() == 'json':
172 214 self.pack = json_packer
173 215 self.unpack = json_unpacker
174 216 elif new.lower() == 'pickle':
175 217 self.pack = pickle_packer
176 218 self.unpack = pickle_unpacker
177 219 else:
178 220 self.pack = import_item(str(new))
179 221
180 222 unpacker = Unicode('json', config=True,
181 223 help="""The name of the unpacker for unserializing messages.
182 224 Only used with custom functions for `packer`.""")
183 225 def _unpacker_changed(self, name, old, new):
184 226 if new.lower() == 'json':
185 227 self.pack = json_packer
186 228 self.unpack = json_unpacker
187 229 elif new.lower() == 'pickle':
188 230 self.pack = pickle_packer
189 231 self.unpack = pickle_unpacker
190 232 else:
191 233 self.unpack = import_item(str(new))
192 234
193 235 session = CStr('', config=True,
194 236 help="""The UUID identifying this session.""")
195 237 def _session_default(self):
196 238 return bytes(uuid.uuid4())
239
197 240 username = Unicode(os.environ.get('USER','username'), config=True,
198 241 help="""Username for the Session. Default is your system username.""")
199 242
200 243 # message signature related traits:
201 244 key = CStr('', config=True,
202 245 help="""execution key, for extra authentication.""")
203 246 def _key_changed(self, name, old, new):
204 247 if new:
205 248 self.auth = hmac.HMAC(new)
206 249 else:
207 250 self.auth = None
208 251 auth = Instance(hmac.HMAC)
209 counters = Instance('collections.defaultdict', (int,))
210 252 digest_history = Set()
211 253
212 254 keyfile = Unicode('', config=True,
213 255 help="""path to file containing execution key.""")
214 256 def _keyfile_changed(self, name, old, new):
215 257 with open(new, 'rb') as f:
216 258 self.key = f.read().strip()
217 259
218 260 pack = Any(default_packer) # the actual packer function
219 261 def _pack_changed(self, name, old, new):
220 262 if not callable(new):
221 263 raise TypeError("packer must be callable, not %s"%type(new))
222 264
223 265 unpack = Any(default_unpacker) # the actual packer function
224 266 def _unpack_changed(self, name, old, new):
225 267 # unpacker is not checked - it is assumed to be
226 268 if not callable(new):
227 269 raise TypeError("unpacker must be callable, not %s"%type(new))
228 270
229 271 def __init__(self, **kwargs):
272 """create a Session object
273
274 Parameters
275 ----------
276
277 debug : bool
278 whether to trigger extra debugging statements
279 packer/unpacker : str : 'json', 'pickle' or import_string
280 importstrings for methods to serialize message parts. If just
281 'json' or 'pickle', predefined JSON and pickle packers will be used.
282 Otherwise, the entire importstring must be used.
283
284 The functions must accept at least valid JSON input, and output
285 *bytes*.
286
287 For example, to use msgpack:
288 packer = 'msgpack.packb', unpacker='msgpack.unpackb'
289 pack/unpack : callables
290 You can also set the pack/unpack callables for serialization
291 directly.
292 session : bytes
293 the ID of this Session object. The default is to generate a new
294 UUID.
295 username : unicode
296 username added to message headers. The default is to ask the OS.
297 key : bytes
298 The key used to initialize an HMAC signature. If unset, messages
299 will not be signed or checked.
300 keyfile : filepath
301 The file containing a key. If this is set, `key` will be
302 initialized to the contents of the file.
303 """
230 304 super(Session, self).__init__(**kwargs)
231 305 self._check_packers()
232 306 self.none = self.pack({})
233 307
234 308 @property
235 309 def msg_id(self):
236 310 """always return new uuid"""
237 311 return str(uuid.uuid4())
238 312
239 313 def _check_packers(self):
240 314 """check packers for binary data and datetime support."""
241 315 pack = self.pack
242 316 unpack = self.unpack
243 317
244 318 # check simple serialization
245 319 msg = dict(a=[1,'hi'])
246 320 try:
247 321 packed = pack(msg)
248 322 except Exception:
249 323 raise ValueError("packer could not serialize a simple message")
250 324
251 325 # ensure packed message is bytes
252 326 if not isinstance(packed, bytes):
253 327 raise ValueError("message packed to %r, but bytes are required"%type(packed))
254 328
255 329 # check that unpack is pack's inverse
256 330 try:
257 331 unpacked = unpack(packed)
258 332 except Exception:
259 333 raise ValueError("unpacker could not handle the packer's output")
260 334
261 335 # check datetime support
262 336 msg = dict(t=datetime.now())
263 337 try:
264 338 unpacked = unpack(pack(msg))
265 339 except Exception:
266 340 self.pack = lambda o: pack(squash_dates(o))
267 341 self.unpack = lambda s: extract_dates(unpack(s))
268 342
269 343 def msg_header(self, msg_type):
270 344 return msg_header(self.msg_id, msg_type, self.username, self.session)
271 345
272 346 def msg(self, msg_type, content=None, parent=None, subheader=None):
273 347 msg = {}
274 348 msg['header'] = self.msg_header(msg_type)
275 349 msg['msg_id'] = msg['header']['msg_id']
276 350 msg['parent_header'] = {} if parent is None else extract_header(parent)
277 351 msg['msg_type'] = msg_type
278 352 msg['content'] = {} if content is None else content
279 353 sub = {} if subheader is None else subheader
280 354 msg['header'].update(sub)
281 355 return msg
282 356
283 357 def sign(self, msg):
284 358 """Sign a message with HMAC digest. If no auth, return b''."""
285 359 if self.auth is None:
286 360 return b''
287 361 h = self.auth.copy()
288 362 for m in msg:
289 363 h.update(m)
290 364 return h.hexdigest()
291 365
292 366 def serialize(self, msg, ident=None):
367 """Serialize the message components to bytes.
368
369 Returns
370 -------
371
372 list of bytes objects
373
374 """
293 375 content = msg.get('content', {})
294 376 if content is None:
295 377 content = self.none
296 378 elif isinstance(content, dict):
297 379 content = self.pack(content)
298 380 elif isinstance(content, bytes):
299 381 # content is already packed, as in a relayed message
300 382 pass
301 383 elif isinstance(content, unicode):
302 384 # should be bytes, but JSON often spits out unicode
303 385 content = content.encode('utf8')
304 386 else:
305 387 raise TypeError("Content incorrect type: %s"%type(content))
306 388
307 389 real_message = [self.pack(msg['header']),
308 390 self.pack(msg['parent_header']),
309 391 content
310 392 ]
311 393
312 394 to_send = []
313 395
314 396 if isinstance(ident, list):
315 397 # accept list of idents
316 398 to_send.extend(ident)
317 399 elif ident is not None:
318 400 to_send.append(ident)
319 401 to_send.append(DELIM)
320 402
321 403 signature = self.sign(real_message)
322 404 to_send.append(signature)
323 405
324 406 to_send.extend(real_message)
325 407
326 408 return to_send
327 409
328 410 def send(self, stream, msg_or_type, content=None, parent=None, ident=None,
329 411 buffers=None, subheader=None, track=False):
330 412 """Build and send a message via stream or socket.
331 413
332 414 Parameters
333 415 ----------
334 416
335 417 stream : zmq.Socket or ZMQStream
336 418 the socket-like object used to send the data
337 419 msg_or_type : str or Message/dict
338 Normally, msg_or_type will be a msg_type unless a message is being sent more
339 than once.
420 Normally, msg_or_type will be a msg_type unless a message is being
421 sent more than once.
340 422
341 423 content : dict or None
342 424 the content of the message (ignored if msg_or_type is a message)
343 425 parent : Message or dict or None
344 426 the parent or parent header describing the parent of this message
345 427 ident : bytes or list of bytes
346 428 the zmq.IDENTITY routing path
347 429 subheader : dict or None
348 430 extra header keys for this message's header
349 431 buffers : list or None
350 432 the already-serialized buffers to be appended to the message
351 433 track : bool
352 434 whether to track. Only for use with Sockets,
353 435 because ZMQStream objects cannot track messages.
354 436
355 437 Returns
356 438 -------
357 439 msg : message dict
358 440 the constructed message
359 441 (msg,tracker) : (message dict, MessageTracker)
360 442 if track=True, then a 2-tuple will be returned,
361 443 the first element being the constructed
362 444 message, and the second being the MessageTracker
363 445
364 446 """
365 447
366 448 if not isinstance(stream, (zmq.Socket, ZMQStream)):
367 449 raise TypeError("stream must be Socket or ZMQStream, not %r"%type(stream))
368 450 elif track and isinstance(stream, ZMQStream):
369 451 raise TypeError("ZMQStream cannot track messages")
370 452
371 453 if isinstance(msg_or_type, (Message, dict)):
372 454 # we got a Message, not a msg_type
373 455 # don't build a new Message
374 456 msg = msg_or_type
375 457 else:
376 458 msg = self.msg(msg_or_type, content, parent, subheader)
377 459
378 460 buffers = [] if buffers is None else buffers
379 461 to_send = self.serialize(msg, ident)
380 462 flag = 0
381 463 if buffers:
382 464 flag = zmq.SNDMORE
383 465 _track = False
384 466 else:
385 467 _track=track
386 468 if track:
387 469 tracker = stream.send_multipart(to_send, flag, copy=False, track=_track)
388 470 else:
389 471 tracker = stream.send_multipart(to_send, flag, copy=False)
390 472 for b in buffers[:-1]:
391 473 stream.send(b, flag, copy=False)
392 474 if buffers:
393 475 if track:
394 476 tracker = stream.send(buffers[-1], copy=False, track=track)
395 477 else:
396 478 tracker = stream.send(buffers[-1], copy=False)
397 479
398 480 # omsg = Message(msg)
399 481 if self.debug:
400 482 pprint.pprint(msg)
401 483 pprint.pprint(to_send)
402 484 pprint.pprint(buffers)
403 485
404 486 msg['tracker'] = tracker
405 487
406 488 return msg
407 489
408 490 def send_raw(self, stream, msg, flags=0, copy=True, ident=None):
409 491 """Send a raw message via ident path.
410 492
411 493 Parameters
412 494 ----------
413 495 msg : list of sendable buffers"""
414 496 to_send = []
415 497 if isinstance(ident, bytes):
416 498 ident = [ident]
417 499 if ident is not None:
418 500 to_send.extend(ident)
419 501
420 502 to_send.append(DELIM)
421 503 to_send.append(self.sign(msg))
422 504 to_send.extend(msg)
423 505 stream.send_multipart(msg, flags, copy=copy)
424 506
425 507 def recv(self, socket, mode=zmq.NOBLOCK, content=True, copy=True):
426 508 """receives and unpacks a message
427 509 returns [idents], msg"""
428 510 if isinstance(socket, ZMQStream):
429 511 socket = socket.socket
430 512 try:
431 513 msg = socket.recv_multipart(mode)
432 514 except zmq.ZMQError as e:
433 515 if e.errno == zmq.EAGAIN:
434 516 # We can convert EAGAIN to None as we know in this case
435 517 # recv_multipart won't return None.
436 518 return None,None
437 519 else:
438 520 raise
439 # return an actual Message object
440 # determine the number of idents by trying to unpack them.
441 # this is terrible:
521 # split multipart message into identity list and message dict
522 # invalid large messages can cause very expensive string comparisons
442 523 idents, msg = self.feed_identities(msg, copy)
443 524 try:
444 525 return idents, self.unpack_message(msg, content=content, copy=copy)
445 526 except Exception as e:
446 527 print (idents, msg)
447 528 # TODO: handle it
448 529 raise e
449 530
450 531 def feed_identities(self, msg, copy=True):
451 """feed until DELIM is reached, then return the prefix as idents and remainder as
452 msg. This is easily broken by setting an IDENT to DELIM, but that would be silly.
532 """feed until DELIM is reached, then return the prefix as idents and
533 remainder as msg. This is easily broken by setting an IDENT to DELIM,
534 but that would be silly.
453 535
454 536 Parameters
455 537 ----------
456 538 msg : a list of Message or bytes objects
457 539 the message to be split
458 540 copy : bool
459 541 flag determining whether the arguments are bytes or Messages
460 542
461 543 Returns
462 544 -------
463 545 (idents,msg) : two lists
464 546 idents will always be a list of bytes - the indentity prefix
465 547 msg will be a list of bytes or Messages, unchanged from input
466 548 msg should be unpackable via self.unpack_message at this point.
467 549 """
468 550 if copy:
469 551 idx = msg.index(DELIM)
470 552 return msg[:idx], msg[idx+1:]
471 553 else:
472 554 failed = True
473 555 for idx,m in enumerate(msg):
474 556 if m.bytes == DELIM:
475 557 failed = False
476 558 break
477 559 if failed:
478 560 raise ValueError("DELIM not in msg")
479 561 idents, msg = msg[:idx], msg[idx+1:]
480 562 return [m.bytes for m in idents], msg
481 563
482 564 def unpack_message(self, msg, content=True, copy=True):
483 565 """Return a message object from the format
484 566 sent by self.send.
485 567
486 568 Parameters:
487 569 -----------
488 570
489 571 content : bool (True)
490 572 whether to unpack the content dict (True),
491 573 or leave it serialized (False)
492 574
493 575 copy : bool (True)
494 576 whether to return the bytes (True),
495 577 or the non-copying Message object in each place (False)
496 578
497 579 """
498 580 minlen = 4
499 581 message = {}
500 582 if not copy:
501 583 for i in range(minlen):
502 584 msg[i] = msg[i].bytes
503 585 if self.auth is not None:
504 586 signature = msg[0]
505 587 if signature in self.digest_history:
506 588 raise ValueError("Duplicate Signature: %r"%signature)
507 589 self.digest_history.add(signature)
508 590 check = self.sign(msg[1:4])
509 591 if not signature == check:
510 592 raise ValueError("Invalid Signature: %r"%signature)
511 593 if not len(msg) >= minlen:
512 594 raise TypeError("malformed message, must have at least %i elements"%minlen)
513 595 message['header'] = self.unpack(msg[1])
514 596 message['msg_type'] = message['header']['msg_type']
515 597 message['parent_header'] = self.unpack(msg[2])
516 598 if content:
517 599 message['content'] = self.unpack(msg[3])
518 600 else:
519 601 message['content'] = msg[3]
520 602
521 603 message['buffers'] = msg[4:]
522 604 return message
523 605
524 606 def test_msg2obj():
525 607 am = dict(x=1)
526 608 ao = Message(am)
527 609 assert ao.x == am['x']
528 610
529 611 am['y'] = dict(z=1)
530 612 ao = Message(am)
531 613 assert ao.y.z == am['y']['z']
532 614
533 615 k1, k2 = 'y', 'z'
534 616 assert ao[k1][k2] == am[k1][k2]
535 617
536 618 am2 = dict(ao)
537 619 assert am['x'] == am2['x']
538 620 assert am['y']['z'] == am2['y']['z']
621
General Comments 0
You need to be logged in to leave comments. Login now