test_streamsession.py
99 lines
| 3.5 KiB
| text/x-python
|
PythonLexer
MinRK
|
r3539 | |||
import os | ||||
import uuid | ||||
import zmq | ||||
from zmq.tests import BaseZMQTestCase | ||||
MinRK
|
r3654 | from zmq.eventloop.zmqstream import ZMQStream | ||
MinRK
|
r3573 | # from IPython.zmq.tests import SessionTestCase | ||
from IPython.zmq.parallel import streamsession as ss | ||||
MinRK
|
r3539 | |||
class SessionTestCase(BaseZMQTestCase): | ||||
def setUp(self): | ||||
BaseZMQTestCase.setUp(self) | ||||
self.session = ss.StreamSession() | ||||
class TestSession(SessionTestCase): | ||||
def test_msg(self): | ||||
"""message format""" | ||||
msg = self.session.msg('execute') | ||||
thekeys = set('header msg_id parent_header msg_type content'.split()) | ||||
s = set(msg.keys()) | ||||
self.assertEquals(s, thekeys) | ||||
self.assertTrue(isinstance(msg['content'],dict)) | ||||
self.assertTrue(isinstance(msg['header'],dict)) | ||||
self.assertTrue(isinstance(msg['parent_header'],dict)) | ||||
self.assertEquals(msg['msg_type'], 'execute') | ||||
def test_args(self): | ||||
"""initialization arguments for StreamSession""" | ||||
MinRK
|
r3654 | s = self.session | ||
MinRK
|
r3539 | self.assertTrue(s.pack is ss.default_packer) | ||
self.assertTrue(s.unpack is ss.default_unpacker) | ||||
self.assertEquals(s.username, os.environ.get('USER', 'username')) | ||||
s = ss.StreamSession(username=None) | ||||
self.assertEquals(s.username, os.environ.get('USER', 'username')) | ||||
self.assertRaises(TypeError, ss.StreamSession, packer='hi') | ||||
self.assertRaises(TypeError, ss.StreamSession, unpacker='hi') | ||||
u = str(uuid.uuid4()) | ||||
s = ss.StreamSession(username='carrot', session=u) | ||||
self.assertEquals(s.session, u) | ||||
self.assertEquals(s.username, 'carrot') | ||||
MinRK
|
r3654 | def test_tracking(self): | ||
"""test tracking messages""" | ||||
a,b = self.create_bound_pair(zmq.PAIR, zmq.PAIR) | ||||
s = self.session | ||||
stream = ZMQStream(a) | ||||
msg = s.send(a, 'hello', track=False) | ||||
self.assertTrue(msg['tracker'] is None) | ||||
msg = s.send(a, 'hello', track=True) | ||||
self.assertTrue(isinstance(msg['tracker'], zmq.MessageTracker)) | ||||
M = zmq.Message(b'hi there', track=True) | ||||
msg = s.send(a, 'hello', buffers=[M], track=True) | ||||
t = msg['tracker'] | ||||
self.assertTrue(isinstance(t, zmq.MessageTracker)) | ||||
self.assertRaises(zmq.NotDone, t.wait, .1) | ||||
del M | ||||
t.wait(1) # this will raise | ||||
MinRK
|
r3644 | # def test_rekey(self): | ||
# """rekeying dict around json str keys""" | ||||
# d = {'0': uuid.uuid4(), 0:uuid.uuid4()} | ||||
# self.assertRaises(KeyError, ss.rekey, d) | ||||
# | ||||
# d = {'0': uuid.uuid4(), 1:uuid.uuid4(), 'asdf':uuid.uuid4()} | ||||
# d2 = {0:d['0'],1:d[1],'asdf':d['asdf']} | ||||
# rd = ss.rekey(d) | ||||
# self.assertEquals(d2,rd) | ||||
# | ||||
# d = {'1.5':uuid.uuid4(),'1':uuid.uuid4()} | ||||
# d2 = {1.5:d['1.5'],1:d['1']} | ||||
# rd = ss.rekey(d) | ||||
# self.assertEquals(d2,rd) | ||||
# | ||||
# d = {'1.0':uuid.uuid4(),'1':uuid.uuid4()} | ||||
# self.assertRaises(KeyError, ss.rekey, d) | ||||
# | ||||
MinRK
|
r3539 | def test_unique_msg_ids(self): | ||
"""test that messages receive unique ids""" | ||||
ids = set() | ||||
for i in range(2**12): | ||||
h = self.session.msg_header('test') | ||||
msg_id = h['msg_id'] | ||||
self.assertTrue(msg_id not in ids) | ||||
ids.add(msg_id) | ||||
def test_feed_identities(self): | ||||
"""scrub the front for zmq IDENTITIES""" | ||||
theids = "engine client other".split() | ||||
content = dict(code='whoda',stuff=object()) | ||||
themsg = self.session.msg('execute',content=content) | ||||
pmsg = theids | ||||