##// END OF EJS Templates
tweak dagdeps for new AsyncResult objects
tweak dagdeps for new AsyncResult objects

File last commit:

r3573:0bea1480
r3606:9f1a03ab
Show More
test_streamsession.py
82 lines | 2.7 KiB | text/x-python | PythonLexer
import os
import uuid
import zmq
from zmq.tests import BaseZMQTestCase
# from IPython.zmq.tests import SessionTestCase
from IPython.zmq.parallel import streamsession as ss
class SessionTestCase(BaseZMQTestCase):
def setUp(self):
BaseZMQTestCase.setUp(self)
self.session = ss.StreamSession()
class TestSession(SessionTestCase):
def test_msg(self):
"""message format"""
msg = self.session.msg('execute')
thekeys = set('header msg_id parent_header msg_type content'.split())
s = set(msg.keys())
self.assertEquals(s, thekeys)
self.assertTrue(isinstance(msg['content'],dict))
self.assertTrue(isinstance(msg['header'],dict))
self.assertTrue(isinstance(msg['parent_header'],dict))
self.assertEquals(msg['msg_type'], 'execute')
def test_args(self):
"""initialization arguments for StreamSession"""
s = ss.StreamSession()
self.assertTrue(s.pack is ss.default_packer)
self.assertTrue(s.unpack is ss.default_unpacker)
self.assertEquals(s.username, os.environ.get('USER', 'username'))
s = ss.StreamSession(username=None)
self.assertEquals(s.username, os.environ.get('USER', 'username'))
self.assertRaises(TypeError, ss.StreamSession, packer='hi')
self.assertRaises(TypeError, ss.StreamSession, unpacker='hi')
u = str(uuid.uuid4())
s = ss.StreamSession(username='carrot', session=u)
self.assertEquals(s.session, u)
self.assertEquals(s.username, 'carrot')
def test_rekey(self):
"""rekeying dict around json str keys"""
d = {'0': uuid.uuid4(), 0:uuid.uuid4()}
self.assertRaises(KeyError, ss.rekey, d)
d = {'0': uuid.uuid4(), 1:uuid.uuid4(), 'asdf':uuid.uuid4()}
d2 = {0:d['0'],1:d[1],'asdf':d['asdf']}
rd = ss.rekey(d)
self.assertEquals(d2,rd)
d = {'1.5':uuid.uuid4(),'1':uuid.uuid4()}
d2 = {1.5:d['1.5'],1:d['1']}
rd = ss.rekey(d)
self.assertEquals(d2,rd)
d = {'1.0':uuid.uuid4(),'1':uuid.uuid4()}
self.assertRaises(KeyError, ss.rekey, d)
def test_unique_msg_ids(self):
"""test that messages receive unique ids"""
ids = set()
for i in range(2**12):
h = self.session.msg_header('test')
msg_id = h['msg_id']
self.assertTrue(msg_id not in ids)
ids.add(msg_id)
def test_feed_identities(self):
"""scrub the front for zmq IDENTITIES"""
theids = "engine client other".split()
content = dict(code='whoda',stuff=object())
themsg = self.session.msg('execute',content=content)
pmsg = theids