##// END OF EJS Templates
Simplified output stream management in ultratb....
Simplified output stream management in ultratb. This lets us have one less hack in the testing globalipapp code, as well as rationalizing the ultratb code a little more. There is now an ostream property for all output that can be used consistently.

File last commit:

r2606:f0587bad
r2852:6c926026
Show More
session.py
122 lines | 3.2 KiB | text/x-python | PythonLexer
import os
import uuid
import pprint
import zmq
class Message(object):
"""A simple message object that maps dict keys to attributes.
A Message can be created from a dict and a dict from a Message instance
simply by calling dict(msg_obj)."""
def __init__(self, msg_dict):
dct = self.__dict__
for k, v in msg_dict.iteritems():
if isinstance(v, dict):
v = Message(v)
dct[k] = v
# Having this iterator lets dict(msg_obj) work out of the box.
def __iter__(self):
return iter(self.__dict__.iteritems())
def __repr__(self):
return repr(self.__dict__)
def __str__(self):
return pprint.pformat(self.__dict__)
def __contains__(self, k):
return k in self.__dict__
def __getitem__(self, k):
return self.__dict__[k]
def msg_header(msg_id, username, session):
return {
'msg_id' : msg_id,
'username' : username,
'session' : session
}
def extract_header(msg_or_header):
"""Given a message or header, return the header."""
if not msg_or_header:
return {}
try:
# See if msg_or_header is the entire message.
h = msg_or_header['header']
except KeyError:
try:
# See if msg_or_header is just the header
h = msg_or_header['msg_id']
except KeyError:
raise
else:
h = msg_or_header
if not isinstance(h, dict):
h = dict(h)
return h
class Session(object):
def __init__(self, username=os.environ.get('USER','username'), session=None):
self.username = username
if session is None:
self.session = str(uuid.uuid4())
else:
self.session = session
self.msg_id = 0
def msg_header(self):
h = msg_header(self.msg_id, self.username, self.session)
self.msg_id += 1
return h
def msg(self, msg_type, content=None, parent=None):
msg = {}
msg['header'] = self.msg_header()
msg['parent_header'] = {} if parent is None else extract_header(parent)
msg['msg_type'] = msg_type
msg['content'] = {} if content is None else content
return msg
def send(self, socket, msg_type, content=None, parent=None, ident=None):
msg = self.msg(msg_type, content, parent)
if ident is not None:
socket.send(ident, zmq.SNDMORE)
socket.send_json(msg)
omsg = Message(msg)
return omsg
def recv(self, socket, mode=zmq.NOBLOCK):
try:
msg = socket.recv_json(mode)
except zmq.ZMQError, e:
if e.errno == zmq.EAGAIN:
# We can convert EAGAIN to None as we know in this case
# recv_json won't return None.
return None
else:
raise
return Message(msg)
def test_msg2obj():
am = dict(x=1)
ao = Message(am)
assert ao.x == am['x']
am['y'] = dict(z=1)
ao = Message(am)
assert ao.y.z == am['y']['z']
k1, k2 = 'y', 'z'
assert ao[k1][k2] == am[k1][k2]
am2 = dict(ao)
assert am['x'] == am2['x']
assert am['y']['z'] == am2['y']['z']