session.py
767 lines
| 26.7 KiB
| text/x-python
|
PythonLexer
MinRK
|
r4017 | """Session object for building, serializing, sending, and receiving messages in | ||
IPython. The Session object supports serialization, HMAC signatures, and | ||||
metadata on messages. | ||||
Also defined here are utilities for working with Sessions: | ||||
* A SessionFactory to be used as a base class for configurables that work with | ||||
Sessions. | ||||
* A Message object for convenience that allows attribute-access to the msg dict. | ||||
MinRK
|
r4018 | |||
Authors: | ||||
* Min RK | ||||
* Brian Granger | ||||
* Fernando Perez | ||||
MinRK
|
r4006 | """ | ||
#----------------------------------------------------------------------------- | ||||
# Copyright (C) 2010-2011 The IPython Development Team | ||||
# | ||||
# Distributed under the terms of the BSD License. The full license is in | ||||
# the file COPYING, distributed as part of this software. | ||||
#----------------------------------------------------------------------------- | ||||
#----------------------------------------------------------------------------- | ||||
# Imports | ||||
#----------------------------------------------------------------------------- | ||||
import hmac | ||||
MinRK
|
r4007 | import logging | ||
Fernando Perez
|
r2597 | import os | ||
import pprint | ||||
MinRK
|
r4006 | import uuid | ||
from datetime import datetime | ||||
try: | ||||
import cPickle | ||||
pickle = cPickle | ||||
except: | ||||
cPickle = None | ||||
import pickle | ||||
Fernando Perez
|
r2597 | |||
import zmq | ||||
MinRK
|
r4006 | from zmq.utils import jsonapi | ||
MinRK
|
r4007 | from zmq.eventloop.ioloop import IOLoop | ||
MinRK
|
r4006 | from zmq.eventloop.zmqstream import ZMQStream | ||
MinRK
|
r4962 | from IPython.config.application import Application, boolean_flag | ||
MinRK
|
r4017 | from IPython.config.configurable import Configurable, LoggingConfigurable | ||
MinRK
|
r4006 | from IPython.utils.importstring import import_item | ||
MinRK
|
r4008 | from IPython.utils.jsonutil import extract_dates, squash_dates, date_default | ||
Thomas Kluyver
|
r4734 | from IPython.utils.py3compat import str_to_bytes | ||
MinRK
|
r4091 | from IPython.utils.traitlets import (CBytes, Unicode, Bool, Any, Instance, Set, | ||
MinRK
|
r8033 | DottedObjectName, CUnicode, Dict, Integer) | ||
from IPython.zmq.serialize import MAX_ITEMS, MAX_BYTES | ||||
MinRK
|
r4006 | |||
#----------------------------------------------------------------------------- | ||||
# utility functions | ||||
#----------------------------------------------------------------------------- | ||||
def squash_unicode(obj): | ||||
"""coerce unicode back to bytestrings.""" | ||||
if isinstance(obj,dict): | ||||
for key in obj.keys(): | ||||
obj[key] = squash_unicode(obj[key]) | ||||
if isinstance(key, unicode): | ||||
obj[squash_unicode(key)] = obj.pop(key) | ||||
elif isinstance(obj, list): | ||||
for i,v in enumerate(obj): | ||||
obj[i] = squash_unicode(v) | ||||
elif isinstance(obj, unicode): | ||||
obj = obj.encode('utf8') | ||||
return obj | ||||
#----------------------------------------------------------------------------- | ||||
# globals and defaults | ||||
#----------------------------------------------------------------------------- | ||||
MinRK
|
r4962 | |||
MinRK
|
r6060 | |||
MinRK
|
r6629 | # ISO8601-ify datetime objects | ||
json_packer = lambda obj: jsonapi.dumps(obj, default=date_default) | ||||
json_unpacker = lambda s: extract_dates(jsonapi.loads(s)) | ||||
Fernando Perez
|
r2597 | |||
MinRK
|
r4006 | pickle_packer = lambda o: pickle.dumps(o,-1) | ||
pickle_unpacker = pickle.loads | ||||
default_packer = json_packer | ||||
default_unpacker = json_unpacker | ||||
MinRK
|
r7968 | DELIM = b"<IDS|MSG>" | ||
# singleton dummy tracker, which will always report as done | ||||
DONE = zmq.MessageTracker() | ||||
MinRK
|
r4962 | |||
#----------------------------------------------------------------------------- | ||||
# Mixin tools for apps that use Sessions | ||||
#----------------------------------------------------------------------------- | ||||
session_aliases = dict( | ||||
ident = 'Session.session', | ||||
user = 'Session.username', | ||||
keyfile = 'Session.keyfile', | ||||
) | ||||
session_flags = { | ||||
'secure' : ({'Session' : { 'key' : str_to_bytes(str(uuid.uuid4())), | ||||
'keyfile' : '' }}, | ||||
"""Use HMAC digests for authentication of messages. | ||||
Setting this flag will generate a new UUID to use as the HMAC key. | ||||
"""), | ||||
'no-secure' : ({'Session' : { 'key' : b'', 'keyfile' : '' }}, | ||||
"""Don't authenticate messages."""), | ||||
} | ||||
def default_secure(cfg): | ||||
"""Set the default behavior for a config environment to be secure. | ||||
If Session.key/keyfile have not been set, set Session.key to | ||||
a new random UUID. | ||||
""" | ||||
if 'Session' in cfg: | ||||
if 'key' in cfg.Session or 'keyfile' in cfg.Session: | ||||
return | ||||
# key/keyfile not specified, generate new UUID: | ||||
cfg.Session.key = str_to_bytes(str(uuid.uuid4())) | ||||
MinRK
|
r4006 | #----------------------------------------------------------------------------- | ||
# Classes | ||||
#----------------------------------------------------------------------------- | ||||
MinRK
|
r3269 | |||
MinRK
|
r4017 | class SessionFactory(LoggingConfigurable): | ||
MinRK
|
r4007 | """The Base class for configurables that have a Session, Context, logger, | ||
and IOLoop. | ||||
""" | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r4007 | logname = Unicode('') | ||
def _logname_changed(self, name, old, new): | ||||
self.log = logging.getLogger(new) | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r4007 | # not configurable: | ||
context = Instance('zmq.Context') | ||||
def _context_default(self): | ||||
return zmq.Context.instance() | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r4007 | session = Instance('IPython.zmq.session.Session') | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r4007 | loop = Instance('zmq.eventloop.ioloop.IOLoop', allow_none=False) | ||
def _loop_default(self): | ||||
return IOLoop.instance() | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r4007 | def __init__(self, **kwargs): | ||
super(SessionFactory, self).__init__(**kwargs) | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r4007 | if self.session is None: | ||
# construct the session | ||||
self.session = Session(**kwargs) | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r4007 | |||
Fernando Perez
|
r2597 | class Message(object): | ||
"""A simple message object that maps dict keys to attributes. | ||||
A Message can be created from a dict and a dict from a Message instance | ||||
simply by calling dict(msg_obj).""" | ||||
Bernardo B. Marques
|
r4872 | |||
Fernando Perez
|
r2597 | def __init__(self, msg_dict): | ||
dct = self.__dict__ | ||||
MinRK
|
r4006 | for k, v in dict(msg_dict).iteritems(): | ||
Fernando Perez
|
r2597 | if isinstance(v, dict): | ||
v = Message(v) | ||||
dct[k] = v | ||||
# Having this iterator lets dict(msg_obj) work out of the box. | ||||
def __iter__(self): | ||||
return iter(self.__dict__.iteritems()) | ||||
Bernardo B. Marques
|
r4872 | |||
Fernando Perez
|
r2597 | def __repr__(self): | ||
return repr(self.__dict__) | ||||
def __str__(self): | ||||
return pprint.pformat(self.__dict__) | ||||
def __contains__(self, k): | ||||
return k in self.__dict__ | ||||
def __getitem__(self, k): | ||||
return self.__dict__[k] | ||||
MinRK
|
r4006 | def msg_header(msg_id, msg_type, username, session): | ||
MinRK
|
r4008 | date = datetime.now() | ||
MinRK
|
r4006 | return locals() | ||
Fernando Perez
|
r2597 | |||
def extract_header(msg_or_header): | ||||
"""Given a message or header, return the header.""" | ||||
if not msg_or_header: | ||||
return {} | ||||
try: | ||||
# See if msg_or_header is the entire message. | ||||
h = msg_or_header['header'] | ||||
except KeyError: | ||||
try: | ||||
# See if msg_or_header is just the header | ||||
h = msg_or_header['msg_id'] | ||||
except KeyError: | ||||
raise | ||||
else: | ||||
h = msg_or_header | ||||
if not isinstance(h, dict): | ||||
h = dict(h) | ||||
return h | ||||
MinRK
|
r4006 | class Session(Configurable): | ||
MinRK
|
r4017 | """Object for handling serialization and sending of messages. | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r4017 | The Session object handles building messages and sending them | ||
with ZMQ sockets or ZMQStream objects. Objects can communicate with each | ||||
other over the network via Session objects, and only need to work with the | ||||
Bernardo B. Marques
|
r4872 | dict-based IPython message spec. The Session will handle | ||
MinRK
|
r4017 | serialization/deserialization, security, and metadata. | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r4017 | Sessions support configurable serialiization via packer/unpacker traits, | ||
and signing with HMAC digests via the key/keyfile traits. | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r4017 | Parameters | ||
---------- | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r4017 | debug : bool | ||
whether to trigger extra debugging statements | ||||
packer/unpacker : str : 'json', 'pickle' or import_string | ||||
importstrings for methods to serialize message parts. If just | ||||
'json' or 'pickle', predefined JSON and pickle packers will be used. | ||||
Otherwise, the entire importstring must be used. | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r4017 | The functions must accept at least valid JSON input, and output *bytes*. | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r4017 | For example, to use msgpack: | ||
packer = 'msgpack.packb', unpacker='msgpack.unpackb' | ||||
pack/unpack : callables | ||||
You can also set the pack/unpack callables for serialization directly. | ||||
session : bytes | ||||
the ID of this Session object. The default is to generate a new UUID. | ||||
username : unicode | ||||
username added to message headers. The default is to ask the OS. | ||||
key : bytes | ||||
The key used to initialize an HMAC signature. If unset, messages | ||||
will not be signed or checked. | ||||
keyfile : filepath | ||||
The file containing a key. If this is set, `key` will be initialized | ||||
to the contents of the file. | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r4017 | """ | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r4006 | debug=Bool(False, config=True, help="""Debug output in the Session""") | ||
Bernardo B. Marques
|
r4872 | |||
Thomas Kluyver
|
r4055 | packer = DottedObjectName('json',config=True, | ||
MinRK
|
r4006 | help="""The name of the packer for serializing messages. | ||
Should be one of 'json', 'pickle', or an import name | ||||
MinRK
|
r4008 | for a custom callable serializer.""") | ||
MinRK
|
r4006 | def _packer_changed(self, name, old, new): | ||
if new.lower() == 'json': | ||||
self.pack = json_packer | ||||
self.unpack = json_unpacker | ||||
MinRK
|
r7889 | self.unpacker = new | ||
MinRK
|
r4006 | elif new.lower() == 'pickle': | ||
self.pack = pickle_packer | ||||
self.unpack = pickle_unpacker | ||||
MinRK
|
r7889 | self.unpacker = new | ||
Brian Granger
|
r2606 | else: | ||
MinRK
|
r4008 | self.pack = import_item(str(new)) | ||
Fernando Perez
|
r2597 | |||
Thomas Kluyver
|
r4055 | unpacker = DottedObjectName('json', config=True, | ||
MinRK
|
r4006 | help="""The name of the unpacker for unserializing messages. | ||
Only used with custom functions for `packer`.""") | ||||
def _unpacker_changed(self, name, old, new): | ||||
if new.lower() == 'json': | ||||
self.pack = json_packer | ||||
self.unpack = json_unpacker | ||||
MinRK
|
r7889 | self.packer = new | ||
MinRK
|
r4006 | elif new.lower() == 'pickle': | ||
self.pack = pickle_packer | ||||
self.unpack = pickle_unpacker | ||||
MinRK
|
r7889 | self.packer = new | ||
MinRK
|
r4006 | else: | ||
MinRK
|
r4008 | self.unpack = import_item(str(new)) | ||
Bernardo B. Marques
|
r4872 | |||
Thomas Kluyver
|
r4735 | session = CUnicode(u'', config=True, | ||
MinRK
|
r4006 | help="""The UUID identifying this session.""") | ||
def _session_default(self): | ||||
MinRK
|
r4770 | u = unicode(uuid.uuid4()) | ||
self.bsession = u.encode('ascii') | ||||
return u | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r4770 | def _session_changed(self, name, old, new): | ||
self.bsession = self.session.encode('ascii') | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r4770 | # bsession is the session as bytes | ||
bsession = CBytes(b'') | ||||
Bernardo B. Marques
|
r4872 | |||
Brian E. Granger
|
r4234 | username = Unicode(os.environ.get('USER',u'username'), config=True, | ||
MinRK
|
r4006 | help="""Username for the Session. Default is your system username.""") | ||
Bernardo B. Marques
|
r4872 | |||
Jason Grout
|
r7952 | metadata = Dict({}, config=True, | ||
help="""Metadata dictionary, which serves as the default top-level metadata dict for each message.""") | ||||
Jason Grout
|
r7950 | |||
MinRK
|
r4006 | # message signature related traits: | ||
MinRK
|
r4962 | |||
MinRK
|
r4091 | key = CBytes(b'', config=True, | ||
MinRK
|
r4006 | help="""execution key, for extra authentication.""") | ||
def _key_changed(self, name, old, new): | ||||
if new: | ||||
self.auth = hmac.HMAC(new) | ||||
else: | ||||
self.auth = None | ||||
auth = Instance(hmac.HMAC) | ||||
digest_history = Set() | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r4006 | keyfile = Unicode('', config=True, | ||
help="""path to file containing execution key.""") | ||||
def _keyfile_changed(self, name, old, new): | ||||
with open(new, 'rb') as f: | ||||
self.key = f.read().strip() | ||||
Fernando Perez
|
r2597 | |||
MinRK
|
r4962 | # serialization traits: | ||
MinRK
|
r4006 | pack = Any(default_packer) # the actual packer function | ||
def _pack_changed(self, name, old, new): | ||||
if not callable(new): | ||||
raise TypeError("packer must be callable, not %s"%type(new)) | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r4006 | unpack = Any(default_unpacker) # the actual packer function | ||
def _unpack_changed(self, name, old, new): | ||||
Bernardo B. Marques
|
r4872 | # unpacker is not checked - it is assumed to be | ||
MinRK
|
r4006 | if not callable(new): | ||
MinRK
|
r4008 | raise TypeError("unpacker must be callable, not %s"%type(new)) | ||
MinRK
|
r7968 | |||
MinRK
|
r8033 | # thresholds: | ||
copy_threshold = Integer(2**16, config=True, | ||||
MinRK
|
r7968 | help="Threshold (in bytes) beyond which a buffer should be sent without copying.") | ||
MinRK
|
r8033 | buffer_threshold = Integer(MAX_BYTES, config=True, | ||
help="Threshold (in bytes) beyond which an object's buffer should be extracted to avoid pickling.") | ||||
item_threshold = Integer(MAX_ITEMS, config=True, | ||||
help="""The maximum number of items for a container to be introspected for custom serialization. | ||||
Containers larger than this are pickled outright. | ||||
""" | ||||
) | ||||
MinRK
|
r4006 | def __init__(self, **kwargs): | ||
MinRK
|
r4017 | """create a Session object | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r4017 | Parameters | ||
---------- | ||||
debug : bool | ||||
whether to trigger extra debugging statements | ||||
packer/unpacker : str : 'json', 'pickle' or import_string | ||||
importstrings for methods to serialize message parts. If just | ||||
'json' or 'pickle', predefined JSON and pickle packers will be used. | ||||
Otherwise, the entire importstring must be used. | ||||
The functions must accept at least valid JSON input, and output | ||||
*bytes*. | ||||
For example, to use msgpack: | ||||
packer = 'msgpack.packb', unpacker='msgpack.unpackb' | ||||
pack/unpack : callables | ||||
You can also set the pack/unpack callables for serialization | ||||
directly. | ||||
MinRK
|
r4770 | session : unicode (must be ascii) | ||
Bernardo B. Marques
|
r4872 | the ID of this Session object. The default is to generate a new | ||
MinRK
|
r4017 | UUID. | ||
MinRK
|
r4770 | bsession : bytes | ||
The session as bytes | ||||
MinRK
|
r4017 | username : unicode | ||
username added to message headers. The default is to ask the OS. | ||||
key : bytes | ||||
The key used to initialize an HMAC signature. If unset, messages | ||||
will not be signed or checked. | ||||
keyfile : filepath | ||||
Bernardo B. Marques
|
r4872 | The file containing a key. If this is set, `key` will be | ||
MinRK
|
r4017 | initialized to the contents of the file. | ||
""" | ||||
MinRK
|
r4006 | super(Session, self).__init__(**kwargs) | ||
MinRK
|
r4008 | self._check_packers() | ||
MinRK
|
r4006 | self.none = self.pack({}) | ||
MinRK
|
r4770 | # ensure self._session_default() if necessary, so bsession is defined: | ||
self.session | ||||
MinRK
|
r4006 | |||
@property | ||||
def msg_id(self): | ||||
"""always return new uuid""" | ||||
return str(uuid.uuid4()) | ||||
MinRK
|
r4008 | def _check_packers(self): | ||
"""check packers for binary data and datetime support.""" | ||||
pack = self.pack | ||||
unpack = self.unpack | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r4008 | # check simple serialization | ||
msg = dict(a=[1,'hi']) | ||||
try: | ||||
packed = pack(msg) | ||||
except Exception: | ||||
raise ValueError("packer could not serialize a simple message") | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r4008 | # ensure packed message is bytes | ||
if not isinstance(packed, bytes): | ||||
raise ValueError("message packed to %r, but bytes are required"%type(packed)) | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r4008 | # check that unpack is pack's inverse | ||
try: | ||||
unpacked = unpack(packed) | ||||
except Exception: | ||||
raise ValueError("unpacker could not handle the packer's output") | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r4008 | # check datetime support | ||
msg = dict(t=datetime.now()) | ||||
try: | ||||
unpacked = unpack(pack(msg)) | ||||
except Exception: | ||||
self.pack = lambda o: pack(squash_dates(o)) | ||||
self.unpack = lambda s: extract_dates(unpack(s)) | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r4006 | def msg_header(self, msg_type): | ||
return msg_header(self.msg_id, msg_type, self.username, self.session) | ||||
MinRK
|
r7956 | def msg(self, msg_type, content=None, parent=None, header=None, metadata=None): | ||
Brian E. Granger
|
r4220 | """Return the nested message dict. | ||
This format is different from what is sent over the wire. The | ||||
Brian E. Granger
|
r4232 | serialize/unserialize methods converts this nested message dict to the wire | ||
format, which is a list of message parts. | ||||
Brian E. Granger
|
r4220 | """ | ||
Fernando Perez
|
r2597 | msg = {} | ||
MinRK
|
r4711 | header = self.msg_header(msg_type) if header is None else header | ||
msg['header'] = header | ||||
msg['msg_id'] = header['msg_id'] | ||||
msg['msg_type'] = header['msg_type'] | ||||
Fernando Perez
|
r2597 | msg['parent_header'] = {} if parent is None else extract_header(parent) | ||
msg['content'] = {} if content is None else content | ||||
Jason Grout
|
r7955 | msg['metadata'] = self.metadata.copy() | ||
Jason Grout
|
r7952 | if metadata is not None: | ||
Jason Grout
|
r7955 | msg['metadata'].update(metadata) | ||
Fernando Perez
|
r2597 | return msg | ||
Brian E. Granger
|
r4220 | def sign(self, msg_list): | ||
"""Sign a message with HMAC digest. If no auth, return b''. | ||||
Parameters | ||||
---------- | ||||
msg_list : list | ||||
Bernardo B. Marques
|
r4872 | The [p_header,p_parent,p_content] part of the message list. | ||
Brian E. Granger
|
r4220 | """ | ||
MinRK
|
r4006 | if self.auth is None: | ||
return b'' | ||||
h = self.auth.copy() | ||||
Brian E. Granger
|
r4220 | for m in msg_list: | ||
MinRK
|
r4006 | h.update(m) | ||
Thomas Kluyver
|
r4732 | return str_to_bytes(h.hexdigest()) | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r4006 | def serialize(self, msg, ident=None): | ||
MinRK
|
r4017 | """Serialize the message components to bytes. | ||
Brian E. Granger
|
r4220 | |||
Brian E. Granger
|
r4231 | This is roughly the inverse of unserialize. The serialize/unserialize | ||
methods work with full message lists, whereas pack/unpack work with | ||||
the individual message parts in the message list. | ||||
Brian E. Granger
|
r4220 | Parameters | ||
---------- | ||||
msg : dict or Message | ||||
The nexted message dict as returned by the self.msg method. | ||||
MinRK
|
r4017 | Returns | ||
------- | ||||
Brian E. Granger
|
r4220 | msg_list : list | ||
The list of bytes objects to be sent with the format: | ||||
Jason Grout
|
r7955 | [ident1,ident2,...,DELIM,HMAC,p_header,p_parent,p_metadata,p_content, | ||
Brian E. Granger
|
r4220 | buffer1,buffer2,...]. In this list, the p_* entities are | ||
the packed or serialized versions, so if JSON is used, these | ||||
Thomas Kluyver
|
r4733 | are utf8 encoded JSON strings. | ||
MinRK
|
r4017 | """ | ||
MinRK
|
r4006 | content = msg.get('content', {}) | ||
if content is None: | ||||
content = self.none | ||||
elif isinstance(content, dict): | ||||
content = self.pack(content) | ||||
elif isinstance(content, bytes): | ||||
# content is already packed, as in a relayed message | ||||
pass | ||||
elif isinstance(content, unicode): | ||||
# should be bytes, but JSON often spits out unicode | ||||
content = content.encode('utf8') | ||||
else: | ||||
raise TypeError("Content incorrect type: %s"%type(content)) | ||||
Bernardo B. Marques
|
r4872 | |||
real_message = [self.pack(msg['header']), | ||||
self.pack(msg['parent_header']), | ||||
Jason Grout
|
r7955 | self.pack(msg['metadata']), | ||
content, | ||||
MinRK
|
r4006 | ] | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r4006 | to_send = [] | ||
if isinstance(ident, list): | ||||
# accept list of idents | ||||
to_send.extend(ident) | ||||
elif ident is not None: | ||||
to_send.append(ident) | ||||
to_send.append(DELIM) | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r4006 | signature = self.sign(real_message) | ||
to_send.append(signature) | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r4006 | to_send.extend(real_message) | ||
return to_send | ||||
Bernardo B. Marques
|
r4872 | |||
Brian E. Granger
|
r4233 | def send(self, stream, msg_or_type, content=None, parent=None, ident=None, | ||
MinRK
|
r7956 | buffers=None, track=False, header=None, metadata=None): | ||
MinRK
|
r4006 | """Build and send a message via stream or socket. | ||
Brian E. Granger
|
r4220 | |||
The message format used by this function internally is as follows: | ||||
[ident1,ident2,...,DELIM,HMAC,p_header,p_parent,p_content, | ||||
Bernardo B. Marques
|
r4872 | buffer1,buffer2,...] | ||
Brian E. Granger
|
r4220 | |||
Brian E. Granger
|
r4232 | The serialize/unserialize methods convert the nested message dict into this | ||
Brian E. Granger
|
r4220 | format. | ||
MinRK
|
r3271 | Parameters | ||
---------- | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r4006 | stream : zmq.Socket or ZMQStream | ||
Brian E. Granger
|
r4232 | The socket-like object used to send the data. | ||
MinRK
|
r4006 | msg_or_type : str or Message/dict | ||
Bernardo B. Marques
|
r4872 | Normally, msg_or_type will be a msg_type unless a message is being | ||
Brian E. Granger
|
r4234 | sent more than once. If a header is supplied, this can be set to | ||
None and the msg_type will be pulled from the header. | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r4006 | content : dict or None | ||
Brian E. Granger
|
r4232 | The content of the message (ignored if msg_or_type is a message). | ||
header : dict or None | ||||
Jason Grout
|
r7952 | The header dict for the message (ignored if msg_to_type is a message). | ||
MinRK
|
r4006 | parent : Message or dict or None | ||
Brian E. Granger
|
r4232 | The parent or parent header describing the parent of this message | ||
(ignored if msg_or_type is a message). | ||||
MinRK
|
r4006 | ident : bytes or list of bytes | ||
Brian E. Granger
|
r4232 | The zmq.IDENTITY routing path. | ||
Jason Grout
|
r7952 | metadata : dict or None | ||
The metadata describing the message | ||||
MinRK
|
r4006 | buffers : list or None | ||
Brian E. Granger
|
r4232 | The already-serialized buffers to be appended to the message. | ||
MinRK
|
r4006 | track : bool | ||
Brian E. Granger
|
r4232 | Whether to track. Only for use with Sockets, because ZMQStream | ||
objects cannot track messages. | ||||
Jason Grout
|
r7952 | |||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3271 | Returns | ||
------- | ||||
Brian E. Granger
|
r4232 | msg : dict | ||
The constructed message. | ||||
MinRK
|
r3271 | """ | ||
MinRK
|
r4006 | |||
if not isinstance(stream, (zmq.Socket, ZMQStream)): | ||||
raise TypeError("stream must be Socket or ZMQStream, not %r"%type(stream)) | ||||
elif track and isinstance(stream, ZMQStream): | ||||
raise TypeError("ZMQStream cannot track messages") | ||||
Bernardo B. Marques
|
r4872 | |||
Brian Granger
|
r3290 | if isinstance(msg_or_type, (Message, dict)): | ||
Brian E. Granger
|
r4232 | # We got a Message or message dict, not a msg_type so don't | ||
# build a new Message. | ||||
MinRK
|
r4006 | msg = msg_or_type | ||
MinRK
|
r3269 | else: | ||
Bernardo B. Marques
|
r4872 | msg = self.msg(msg_or_type, content=content, parent=parent, | ||
MinRK
|
r7956 | header=header, metadata=metadata) | ||
Brian E. Granger
|
r4232 | |||
MinRK
|
r4006 | buffers = [] if buffers is None else buffers | ||
to_send = self.serialize(msg, ident) | ||||
MinRK
|
r7968 | to_send.extend(buffers) | ||
longest = max([ len(s) for s in to_send ]) | ||||
MinRK
|
r7973 | copy = (longest < self.copy_threshold) | ||
MinRK
|
r7968 | |||
if buffers and track and not copy: | ||||
# only really track when we are doing zero-copy buffers | ||||
tracker = stream.send_multipart(to_send, copy=False, track=True) | ||||
MinRK
|
r4006 | else: | ||
MinRK
|
r7968 | # use dummy tracker, which will be done immediately | ||
tracker = DONE | ||||
stream.send_multipart(to_send, copy=copy) | ||||
MinRK
|
r4006 | if self.debug: | ||
pprint.pprint(msg) | ||||
pprint.pprint(to_send) | ||||
pprint.pprint(buffers) | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r4006 | msg['tracker'] = tracker | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r4006 | return msg | ||
Brian E. Granger
|
r4232 | |||
Brian E. Granger
|
r4220 | def send_raw(self, stream, msg_list, flags=0, copy=True, ident=None): | ||
MinRK
|
r4006 | """Send a raw message via ident path. | ||
Brian E. Granger
|
r4220 | |||
This method is used to send a already serialized message. | ||||
MinRK
|
r4006 | Parameters | ||
---------- | ||||
Brian E. Granger
|
r4220 | stream : ZMQStream or Socket | ||
The ZMQ stream or socket to use for sending the message. | ||||
msg_list : list | ||||
The serialized list of messages to send. This only includes the | ||||
Jason Grout
|
r7955 | [p_header,p_parent,p_metadata,p_content,buffer1,buffer2,...] portion of | ||
Brian E. Granger
|
r4220 | the message. | ||
ident : ident or list | ||||
A single ident or a list of idents to use in sending. | ||||
""" | ||||
MinRK
|
r4006 | to_send = [] | ||
if isinstance(ident, bytes): | ||||
ident = [ident] | ||||
if ident is not None: | ||||
to_send.extend(ident) | ||||
Brian E. Granger
|
r4232 | |||
MinRK
|
r4006 | to_send.append(DELIM) | ||
Brian E. Granger
|
r4220 | to_send.append(self.sign(msg_list)) | ||
to_send.extend(msg_list) | ||||
stream.send_multipart(msg_list, flags, copy=copy) | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r4006 | def recv(self, socket, mode=zmq.NOBLOCK, content=True, copy=True): | ||
Brian E. Granger
|
r4220 | """Receive and unpack a message. | ||
Parameters | ||||
---------- | ||||
socket : ZMQStream or Socket | ||||
The socket or stream to use in receiving. | ||||
Returns | ||||
------- | ||||
[idents], msg | ||||
[idents] is a list of idents and msg is a nested message dict of | ||||
same format as self.msg returns. | ||||
""" | ||||
MinRK
|
r4006 | if isinstance(socket, ZMQStream): | ||
socket = socket.socket | ||||
Fernando Perez
|
r2597 | try: | ||
MinRK
|
r6786 | msg_list = socket.recv_multipart(mode, copy=copy) | ||
MinRK
|
r4006 | except zmq.ZMQError as e: | ||
Fernando Perez
|
r2597 | if e.errno == zmq.EAGAIN: | ||
# We can convert EAGAIN to None as we know in this case | ||||
MinRK
|
r4006 | # recv_multipart won't return None. | ||
MinRK
|
r3269 | return None,None | ||
Fernando Perez
|
r2597 | else: | ||
raise | ||||
MinRK
|
r4017 | # split multipart message into identity list and message dict | ||
# invalid large messages can cause very expensive string comparisons | ||||
Brian E. Granger
|
r4220 | idents, msg_list = self.feed_identities(msg_list, copy) | ||
MinRK
|
r4006 | try: | ||
Brian E. Granger
|
r4231 | return idents, self.unserialize(msg_list, content=content, copy=copy) | ||
MinRK
|
r4006 | except Exception as e: | ||
# TODO: handle it | ||||
raise e | ||||
Bernardo B. Marques
|
r4872 | |||
Brian E. Granger
|
r4220 | def feed_identities(self, msg_list, copy=True): | ||
"""Split the identities from the rest of the message. | ||||
Feed until DELIM is reached, then return the prefix as idents and | ||||
remainder as msg_list. This is easily broken by setting an IDENT to DELIM, | ||||
MinRK
|
r4017 | but that would be silly. | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r4006 | Parameters | ||
---------- | ||||
Brian E. Granger
|
r4220 | msg_list : a list of Message or bytes objects | ||
The message to be split. | ||||
MinRK
|
r4006 | copy : bool | ||
flag determining whether the arguments are bytes or Messages | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r4006 | Returns | ||
------- | ||||
Brian E. Granger
|
r4231 | (idents, msg_list) : two lists | ||
idents will always be a list of bytes, each of which is a ZMQ | ||||
identity. msg_list will be a list of bytes or zmq.Messages of the | ||||
form [HMAC,p_header,p_parent,p_content,buffer1,buffer2,...] and | ||||
should be unpackable/unserializable via self.unserialize at this | ||||
point. | ||||
MinRK
|
r4006 | """ | ||
if copy: | ||||
Brian E. Granger
|
r4220 | idx = msg_list.index(DELIM) | ||
return msg_list[:idx], msg_list[idx+1:] | ||||
MinRK
|
r4006 | else: | ||
failed = True | ||||
Brian E. Granger
|
r4220 | for idx,m in enumerate(msg_list): | ||
MinRK
|
r4006 | if m.bytes == DELIM: | ||
failed = False | ||||
break | ||||
if failed: | ||||
Brian E. Granger
|
r4220 | raise ValueError("DELIM not in msg_list") | ||
idents, msg_list = msg_list[:idx], msg_list[idx+1:] | ||||
return [m.bytes for m in idents], msg_list | ||||
Bernardo B. Marques
|
r4872 | |||
Brian E. Granger
|
r4231 | def unserialize(self, msg_list, content=True, copy=True): | ||
"""Unserialize a msg_list to a nested message dict. | ||||
This is roughly the inverse of serialize. The serialize/unserialize | ||||
methods work with full message lists, whereas pack/unpack work with | ||||
the individual message parts in the message list. | ||||
MinRK
|
r4006 | Parameters: | ||
----------- | ||||
Brian E. Granger
|
r4231 | msg_list : list of bytes or Message objects | ||
The list of message parts of the form [HMAC,p_header,p_parent, | ||||
Jason Grout
|
r7955 | p_metadata,p_content,buffer1,buffer2,...]. | ||
MinRK
|
r4006 | content : bool (True) | ||
Brian E. Granger
|
r4231 | Whether to unpack the content dict (True), or leave it packed | ||
(False). | ||||
MinRK
|
r4006 | copy : bool (True) | ||
Brian E. Granger
|
r4231 | Whether to return the bytes (True), or the non-copying Message | ||
object in each place (False). | ||||
Returns | ||||
------- | ||||
msg : dict | ||||
The nested message dict with top-level keys [header, parent_header, | ||||
content, buffers]. | ||||
MinRK
|
r4006 | """ | ||
Jason Grout
|
r7955 | minlen = 5 | ||
MinRK
|
r4006 | message = {} | ||
if not copy: | ||||
for i in range(minlen): | ||||
Brian E. Granger
|
r4220 | msg_list[i] = msg_list[i].bytes | ||
MinRK
|
r4006 | if self.auth is not None: | ||
Brian E. Granger
|
r4220 | signature = msg_list[0] | ||
MinRK
|
r4522 | if not signature: | ||
raise ValueError("Unsigned Message") | ||||
MinRK
|
r4006 | if signature in self.digest_history: | ||
raise ValueError("Duplicate Signature: %r"%signature) | ||||
self.digest_history.add(signature) | ||||
MinRK
|
r7956 | check = self.sign(msg_list[1:5]) | ||
MinRK
|
r4006 | if not signature == check: | ||
MinRK
|
r7956 | raise ValueError("Invalid Signature: %r" % signature) | ||
Brian E. Granger
|
r4220 | if not len(msg_list) >= minlen: | ||
MinRK
|
r4006 | raise TypeError("malformed message, must have at least %i elements"%minlen) | ||
MinRK
|
r4711 | header = self.unpack(msg_list[1]) | ||
message['header'] = header | ||||
message['msg_id'] = header['msg_id'] | ||||
message['msg_type'] = header['msg_type'] | ||||
Brian E. Granger
|
r4220 | message['parent_header'] = self.unpack(msg_list[2]) | ||
Jason Grout
|
r7955 | message['metadata'] = self.unpack(msg_list[3]) | ||
MinRK
|
r4006 | if content: | ||
Jason Grout
|
r7955 | message['content'] = self.unpack(msg_list[4]) | ||
MinRK
|
r3269 | else: | ||
Jason Grout
|
r7955 | message['content'] = msg_list[4] | ||
Bernardo B. Marques
|
r4872 | |||
Jason Grout
|
r7955 | message['buffers'] = msg_list[5:] | ||
MinRK
|
r4006 | return message | ||
Fernando Perez
|
r2597 | |||
def test_msg2obj(): | ||||
am = dict(x=1) | ||||
ao = Message(am) | ||||
assert ao.x == am['x'] | ||||
am['y'] = dict(z=1) | ||||
ao = Message(am) | ||||
assert ao.y.z == am['y']['z'] | ||||
Bernardo B. Marques
|
r4872 | |||
Fernando Perez
|
r2597 | k1, k2 = 'y', 'z' | ||
assert ao[k1][k2] == am[k1][k2] | ||||
Bernardo B. Marques
|
r4872 | |||
Fernando Perez
|
r2597 | am2 = dict(ao) | ||
assert am['x'] == am2['x'] | ||||
assert am['y']['z'] == am2['y']['z'] | ||||
MinRK
|
r4017 | |||