test_session.py
318 lines
| 12.4 KiB
| text/x-python
|
PythonLexer
MinRK
|
r3970 | """test building messages with streamsession""" | ||
#------------------------------------------------------------------------------- | ||||
# Copyright (C) 2011 The IPython Development Team | ||||
# | ||||
# Distributed under the terms of the BSD License. The full license is in | ||||
# the file COPYING, distributed as part of this software. | ||||
#------------------------------------------------------------------------------- | ||||
#------------------------------------------------------------------------------- | ||||
# Imports | ||||
#------------------------------------------------------------------------------- | ||||
import os | ||||
import uuid | ||||
MinRK
|
r13523 | from datetime import datetime | ||
MinRK
|
r3970 | import zmq | ||
from zmq.tests import BaseZMQTestCase | ||||
from zmq.eventloop.zmqstream import ZMQStream | ||||
MinRK
|
r4006 | |||
MinRK
|
r9372 | from IPython.kernel.zmq import session as ss | ||
MinRK
|
r3970 | |||
MinRK
|
r13523 | from IPython.testing.decorators import skipif, module_not_available | ||
from IPython.utils.py3compat import string_types | ||||
from IPython.utils import jsonutil | ||||
MinRK
|
r12773 | def _bad_packer(obj): | ||
raise TypeError("I don't work") | ||||
def _bad_unpacker(bytes): | ||||
raise TypeError("I don't work either") | ||||
MinRK
|
r3970 | class SessionTestCase(BaseZMQTestCase): | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3970 | def setUp(self): | ||
BaseZMQTestCase.setUp(self) | ||||
MinRK
|
r4006 | self.session = ss.Session() | ||
MinRK
|
r3970 | |||
Brian E. Granger
|
r4234 | |||
MinRK
|
r3970 | class TestSession(SessionTestCase): | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3970 | def test_msg(self): | ||
"""message format""" | ||||
msg = self.session.msg('execute') | ||||
Jason Grout
|
r7955 | thekeys = set('header parent_header metadata content msg_type msg_id'.split()) | ||
MinRK
|
r3970 | s = set(msg.keys()) | ||
Bradley M. Froehle
|
r7874 | self.assertEqual(s, thekeys) | ||
MinRK
|
r3970 | self.assertTrue(isinstance(msg['content'],dict)) | ||
Jason Grout
|
r7955 | self.assertTrue(isinstance(msg['metadata'],dict)) | ||
MinRK
|
r3970 | self.assertTrue(isinstance(msg['header'],dict)) | ||
self.assertTrue(isinstance(msg['parent_header'],dict)) | ||||
MinRK
|
r4711 | self.assertTrue(isinstance(msg['msg_id'],str)) | ||
self.assertTrue(isinstance(msg['msg_type'],str)) | ||||
Bradley M. Froehle
|
r7874 | self.assertEqual(msg['header']['msg_type'], 'execute') | ||
self.assertEqual(msg['msg_type'], 'execute') | ||||
Brian E. Granger
|
r4233 | |||
def test_serialize(self): | ||||
MinRK
|
r6060 | msg = self.session.msg('execute', content=dict(a=10, b=1.1)) | ||
Brian E. Granger
|
r4233 | msg_list = self.session.serialize(msg, ident=b'foo') | ||
ident, msg_list = self.session.feed_identities(msg_list) | ||||
new_msg = self.session.unserialize(msg_list) | ||||
Bradley M. Froehle
|
r7874 | self.assertEqual(ident[0], b'foo') | ||
self.assertEqual(new_msg['msg_id'],msg['msg_id']) | ||||
self.assertEqual(new_msg['msg_type'],msg['msg_type']) | ||||
self.assertEqual(new_msg['header'],msg['header']) | ||||
self.assertEqual(new_msg['content'],msg['content']) | ||||
self.assertEqual(new_msg['parent_header'],msg['parent_header']) | ||||
Jason Grout
|
r7955 | self.assertEqual(new_msg['metadata'],msg['metadata']) | ||
MinRK
|
r6060 | # ensure floats don't come out as Decimal: | ||
Bradley M. Froehle
|
r7874 | self.assertEqual(type(new_msg['content']['b']),type(new_msg['content']['b'])) | ||
Brian E. Granger
|
r4233 | |||
Brian E. Granger
|
r4234 | def test_send(self): | ||
MinRK
|
r9340 | ctx = zmq.Context.instance() | ||
A = ctx.socket(zmq.PAIR) | ||||
B = ctx.socket(zmq.PAIR) | ||||
A.bind("inproc://test") | ||||
B.connect("inproc://test") | ||||
Brian E. Granger
|
r4234 | |||
msg = self.session.msg('execute', content=dict(a=10)) | ||||
MinRK
|
r9340 | self.session.send(A, msg, ident=b'foo', buffers=[b'bar']) | ||
ident, msg_list = self.session.feed_identities(B.recv_multipart()) | ||||
Brian E. Granger
|
r4234 | new_msg = self.session.unserialize(msg_list) | ||
Bradley M. Froehle
|
r7874 | self.assertEqual(ident[0], b'foo') | ||
self.assertEqual(new_msg['msg_id'],msg['msg_id']) | ||||
self.assertEqual(new_msg['msg_type'],msg['msg_type']) | ||||
self.assertEqual(new_msg['header'],msg['header']) | ||||
self.assertEqual(new_msg['content'],msg['content']) | ||||
self.assertEqual(new_msg['parent_header'],msg['parent_header']) | ||||
Jason Grout
|
r7955 | self.assertEqual(new_msg['metadata'],msg['metadata']) | ||
Bradley M. Froehle
|
r7874 | self.assertEqual(new_msg['buffers'],[b'bar']) | ||
Brian E. Granger
|
r4234 | |||
content = msg['content'] | ||||
header = msg['header'] | ||||
parent = msg['parent_header'] | ||||
Jason Grout
|
r7955 | metadata = msg['metadata'] | ||
Brian E. Granger
|
r4234 | msg_type = header['msg_type'] | ||
MinRK
|
r9340 | self.session.send(A, None, content=content, parent=parent, | ||
Jason Grout
|
r7955 | header=header, metadata=metadata, ident=b'foo', buffers=[b'bar']) | ||
MinRK
|
r9340 | ident, msg_list = self.session.feed_identities(B.recv_multipart()) | ||
Brian E. Granger
|
r4234 | new_msg = self.session.unserialize(msg_list) | ||
Bradley M. Froehle
|
r7874 | self.assertEqual(ident[0], b'foo') | ||
self.assertEqual(new_msg['msg_id'],msg['msg_id']) | ||||
self.assertEqual(new_msg['msg_type'],msg['msg_type']) | ||||
self.assertEqual(new_msg['header'],msg['header']) | ||||
self.assertEqual(new_msg['content'],msg['content']) | ||||
Jason Grout
|
r7955 | self.assertEqual(new_msg['metadata'],msg['metadata']) | ||
Bradley M. Froehle
|
r7874 | self.assertEqual(new_msg['parent_header'],msg['parent_header']) | ||
self.assertEqual(new_msg['buffers'],[b'bar']) | ||||
Brian E. Granger
|
r4234 | |||
MinRK
|
r9340 | self.session.send(A, msg, ident=b'foo', buffers=[b'bar']) | ||
ident, new_msg = self.session.recv(B) | ||||
Bradley M. Froehle
|
r7874 | self.assertEqual(ident[0], b'foo') | ||
self.assertEqual(new_msg['msg_id'],msg['msg_id']) | ||||
self.assertEqual(new_msg['msg_type'],msg['msg_type']) | ||||
self.assertEqual(new_msg['header'],msg['header']) | ||||
self.assertEqual(new_msg['content'],msg['content']) | ||||
Jason Grout
|
r7955 | self.assertEqual(new_msg['metadata'],msg['metadata']) | ||
Bradley M. Froehle
|
r7874 | self.assertEqual(new_msg['parent_header'],msg['parent_header']) | ||
self.assertEqual(new_msg['buffers'],[b'bar']) | ||||
Brian E. Granger
|
r4234 | |||
MinRK
|
r9340 | A.close() | ||
B.close() | ||||
ctx.term() | ||||
Brian E. Granger
|
r4234 | |||
MinRK
|
r3970 | def test_args(self): | ||
MinRK
|
r4006 | """initialization arguments for Session""" | ||
MinRK
|
r3970 | s = self.session | ||
self.assertTrue(s.pack is ss.default_packer) | ||||
self.assertTrue(s.unpack is ss.default_unpacker) | ||||
Bradley M. Froehle
|
r7874 | self.assertEqual(s.username, os.environ.get('USER', u'username')) | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r4006 | s = ss.Session() | ||
Bradley M. Froehle
|
r7874 | self.assertEqual(s.username, os.environ.get('USER', u'username')) | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r4006 | self.assertRaises(TypeError, ss.Session, pack='hi') | ||
self.assertRaises(TypeError, ss.Session, unpack='hi') | ||||
MinRK
|
r3970 | u = str(uuid.uuid4()) | ||
Brian E. Granger
|
r4234 | s = ss.Session(username=u'carrot', session=u) | ||
Bradley M. Froehle
|
r7874 | self.assertEqual(s.session, u) | ||
self.assertEqual(s.username, u'carrot') | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3970 | def test_tracking(self): | ||
"""test tracking messages""" | ||||
a,b = self.create_bound_pair(zmq.PAIR, zmq.PAIR) | ||||
s = self.session | ||||
MinRK
|
r7973 | s.copy_threshold = 1 | ||
MinRK
|
r3970 | stream = ZMQStream(a) | ||
msg = s.send(a, 'hello', track=False) | ||||
MinRK
|
r7974 | self.assertTrue(msg['tracker'] is ss.DONE) | ||
MinRK
|
r3970 | msg = s.send(a, 'hello', track=True) | ||
self.assertTrue(isinstance(msg['tracker'], zmq.MessageTracker)) | ||||
M = zmq.Message(b'hi there', track=True) | ||||
msg = s.send(a, 'hello', buffers=[M], track=True) | ||||
t = msg['tracker'] | ||||
self.assertTrue(isinstance(t, zmq.MessageTracker)) | ||||
self.assertRaises(zmq.NotDone, t.wait, .1) | ||||
del M | ||||
t.wait(1) # this will raise | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3970 | def test_unique_msg_ids(self): | ||
"""test that messages receive unique ids""" | ||||
ids = set() | ||||
for i in range(2**12): | ||||
h = self.session.msg_header('test') | ||||
msg_id = h['msg_id'] | ||||
self.assertTrue(msg_id not in ids) | ||||
ids.add(msg_id) | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3970 | def test_feed_identities(self): | ||
"""scrub the front for zmq IDENTITIES""" | ||||
theids = "engine client other".split() | ||||
content = dict(code='whoda',stuff=object()) | ||||
themsg = self.session.msg('execute',content=content) | ||||
pmsg = theids | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r4770 | def test_session_id(self): | ||
session = ss.Session() | ||||
# get bs before us | ||||
bs = session.bsession | ||||
us = session.session | ||||
Bradley M. Froehle
|
r7874 | self.assertEqual(us.encode('ascii'), bs) | ||
MinRK
|
r4770 | session = ss.Session() | ||
# get us before bs | ||||
us = session.session | ||||
bs = session.bsession | ||||
Bradley M. Froehle
|
r7874 | self.assertEqual(us.encode('ascii'), bs) | ||
MinRK
|
r4770 | # change propagates: | ||
session.session = 'something else' | ||||
bs = session.bsession | ||||
us = session.session | ||||
Bradley M. Froehle
|
r7874 | self.assertEqual(us.encode('ascii'), bs) | ||
MinRK
|
r4770 | session = ss.Session(session='stuff') | ||
# get us before bs | ||||
Bradley M. Froehle
|
r7874 | self.assertEqual(session.bsession, session.session.encode('ascii')) | ||
self.assertEqual(b'stuff', session.bsession) | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r10811 | def test_zero_digest_history(self): | ||
session = ss.Session(digest_history_size=0) | ||||
for i in range(11): | ||||
session._add_digest(uuid.uuid4().bytes) | ||||
self.assertEqual(len(session.digest_history), 0) | ||||
def test_cull_digest_history(self): | ||||
session = ss.Session(digest_history_size=100) | ||||
for i in range(100): | ||||
session._add_digest(uuid.uuid4().bytes) | ||||
self.assertTrue(len(session.digest_history) == 100) | ||||
session._add_digest(uuid.uuid4().bytes) | ||||
self.assertTrue(len(session.digest_history) == 91) | ||||
for i in range(9): | ||||
session._add_digest(uuid.uuid4().bytes) | ||||
self.assertTrue(len(session.digest_history) == 100) | ||||
session._add_digest(uuid.uuid4().bytes) | ||||
self.assertTrue(len(session.digest_history) == 91) | ||||
MinRK
|
r12773 | |||
def test_bad_pack(self): | ||||
try: | ||||
session = ss.Session(pack=_bad_packer) | ||||
except ValueError as e: | ||||
self.assertIn("could not serialize", str(e)) | ||||
self.assertIn("don't work", str(e)) | ||||
else: | ||||
self.fail("Should have raised ValueError") | ||||
def test_bad_unpack(self): | ||||
try: | ||||
session = ss.Session(unpack=_bad_unpacker) | ||||
except ValueError as e: | ||||
self.assertIn("could not handle output", str(e)) | ||||
self.assertIn("don't work either", str(e)) | ||||
else: | ||||
self.fail("Should have raised ValueError") | ||||
def test_bad_packer(self): | ||||
try: | ||||
session = ss.Session(packer=__name__ + '._bad_packer') | ||||
except ValueError as e: | ||||
self.assertIn("could not serialize", str(e)) | ||||
self.assertIn("don't work", str(e)) | ||||
else: | ||||
self.fail("Should have raised ValueError") | ||||
def test_bad_unpacker(self): | ||||
try: | ||||
session = ss.Session(unpacker=__name__ + '._bad_unpacker') | ||||
except ValueError as e: | ||||
self.assertIn("could not handle output", str(e)) | ||||
self.assertIn("don't work either", str(e)) | ||||
else: | ||||
self.fail("Should have raised ValueError") | ||||
def test_bad_roundtrip(self): | ||||
with self.assertRaises(ValueError): | ||||
MinRK
|
r13523 | session = ss.Session(unpack=lambda b: 5) | ||
def _datetime_test(self, session): | ||||
content = dict(t=datetime.now()) | ||||
metadata = dict(t=datetime.now()) | ||||
p = session.msg('msg') | ||||
msg = session.msg('msg', content=content, metadata=metadata, parent=p['header']) | ||||
smsg = session.serialize(msg) | ||||
msg2 = session.unserialize(session.feed_identities(smsg)[1]) | ||||
assert isinstance(msg2['header']['date'], datetime) | ||||
self.assertEqual(msg['header'], msg2['header']) | ||||
self.assertEqual(msg['parent_header'], msg2['parent_header']) | ||||
self.assertEqual(msg['parent_header'], msg2['parent_header']) | ||||
assert isinstance(msg['content']['t'], datetime) | ||||
assert isinstance(msg['metadata']['t'], datetime) | ||||
assert isinstance(msg2['content']['t'], string_types) | ||||
assert isinstance(msg2['metadata']['t'], string_types) | ||||
self.assertEqual(msg['content'], jsonutil.extract_dates(msg2['content'])) | ||||
self.assertEqual(msg['content'], jsonutil.extract_dates(msg2['content'])) | ||||
def test_datetimes(self): | ||||
self._datetime_test(self.session) | ||||
def test_datetimes_pickle(self): | ||||
session = ss.Session(packer='pickle') | ||||
self._datetime_test(session) | ||||
@skipif(module_not_available('msgpack')) | ||||
def test_datetimes_msgpack(self): | ||||
MinRK
|
r16766 | import msgpack | ||
session = ss.Session( | ||||
pack=msgpack.packb, | ||||
unpack=lambda buf: msgpack.unpackb(buf, encoding='utf8'), | ||||
) | ||||
MinRK
|
r13523 | self._datetime_test(session) | ||
MinRK
|
r12773 | |||
Doug Blank
|
r14643 | def test_send_raw(self): | ||
ctx = zmq.Context.instance() | ||||
A = ctx.socket(zmq.PAIR) | ||||
B = ctx.socket(zmq.PAIR) | ||||
A.bind("inproc://test") | ||||
B.connect("inproc://test") | ||||
msg = self.session.msg('execute', content=dict(a=10)) | ||||
msg_list = [self.session.pack(msg[part]) for part in | ||||
['header', 'parent_header', 'metadata', 'content']] | ||||
self.session.send_raw(A, msg_list, ident=b'foo') | ||||
ident, new_msg_list = self.session.feed_identities(B.recv_multipart()) | ||||
new_msg = self.session.unserialize(new_msg_list) | ||||
self.assertEqual(ident[0], b'foo') | ||||
self.assertEqual(new_msg['msg_type'],msg['msg_type']) | ||||
self.assertEqual(new_msg['header'],msg['header']) | ||||
self.assertEqual(new_msg['parent_header'],msg['parent_header']) | ||||
self.assertEqual(new_msg['content'],msg['content']) | ||||
self.assertEqual(new_msg['metadata'],msg['metadata']) | ||||
A.close() | ||||
B.close() | ||||
ctx.term() | ||||