##// END OF EJS Templates
add detail_level to object_info requests...
add detail_level to object_info requests as described in message spec

File last commit:

r6554:1b182311
r6556:3921cca8
Show More
test_message_spec.py
374 lines | 9.6 KiB | text/x-python | PythonLexer
/ IPython / zmq / tests / test_message_spec.py
"""Test suite for our zeromq-based messaging specification.
"""
#-----------------------------------------------------------------------------
# Copyright (C) 2010-2011 The IPython Development Team
#
# Distributed under the terms of the BSD License. The full license is in
# the file COPYING.txt, distributed as part of this software.
#-----------------------------------------------------------------------------
import re
import sys
import time
from subprocess import PIPE
from Queue import Empty
import nose.tools as nt
from ..blockingkernelmanager import BlockingKernelManager
from IPython.utils import io
from IPython.utils.traitlets import (
HasTraits, TraitError, Bool, Unicode, Dict, Integer, List, Enum,
)
#-----------------------------------------------------------------------------
# Global setup and utilities
#-----------------------------------------------------------------------------
def setup():
global KM
KM = BlockingKernelManager()
KM.start_kernel(stdout=PIPE, stderr=PIPE)
KM.start_channels()
def teardown():
KM.stop_channels()
KM.shutdown_kernel()
def flush_channels():
"""flush any messages waiting on the queue"""
for channel in (KM.shell_channel, KM.sub_channel):
for msg in channel.get_msgs():
validate_message(msg)
def flush(f):
"""decorator for flushing any incoming messages unhandled after the test"""
def wrapped(*args, **kwargs):
result = f(*args, **kwargs)
flush_channels()
return result
return wrapped
def flush_busy_pyin(msg_id=None):
"""flush status=busy / pyin messages"""
def execute(code='', **kwargs):
"""wrapper for doing common steps for validating an execution request"""
shell = KM.shell_channel
sub = KM.sub_channel
msg_id = shell.execute(code=code, **kwargs)
reply = shell.get_msg(timeout=2)
validate_message(reply, 'execute_reply', msg_id)
busy = sub.get_msg(timeout=2)
validate_message(busy, 'status', msg_id)
nt.assert_equals(busy['content']['execution_state'], 'busy')
if not kwargs.get('silent'):
pyin = sub.get_msg(timeout=2)
validate_message(pyin, 'pyin', msg_id)
nt.assert_equals(pyin['content']['code'], code)
return msg_id, reply['content']
#-----------------------------------------------------------------------------
# MSG Spec References
#-----------------------------------------------------------------------------
class Reference(HasTraits):
def check(self, d):
"""validate a dict against our traits"""
for key in self.trait_names():
nt.assert_true(key in d, "Missing key: %r, should be found in %s" % (key, d))
# FIXME: always allow None, probably not a good idea
if d[key] is None:
continue
try:
setattr(self, key, d[key])
except TraitError as e:
nt.assert_true(False, str(e))
class RMessage(Reference):
msg_id = Unicode()
msg_type = Unicode()
header = Dict()
parent_header = Dict()
content = Dict()
class RHeader(Reference):
msg_id = Unicode()
msg_type = Unicode()
session = Unicode()
username = Unicode()
class RContent(Reference):
status = Enum((u'ok', u'error'))
class ExecuteReply(Reference):
execution_count = Integer()
status = Enum((u'ok', u'error'))
def check(self, d):
Reference.check(self, d)
if d['status'] == 'ok':
ExecuteReplyOkay().check(d)
elif d['status'] == 'error':
ExecuteReplyError().check(d)
class ExecuteReplyOkay(Reference):
payload = List(Dict)
user_variables = Dict()
user_expressions = Dict()
class ExecuteReplyError(Reference):
ename = Unicode()
evalue = Unicode()
traceback = List(Unicode)
class OInfoReply(Reference):
name = Unicode()
found = Bool()
ismagic = Bool()
isalias = Bool()
namespace = Enum((u'builtin', u'magics', u'alias', u'Interactive'))
type_name = Unicode()
string_form = Unicode()
base_class = Unicode()
length = Integer()
file = Unicode()
definition = Unicode()
argspec = Dict()
init_definition = Unicode()
docstring = Unicode()
init_docstring = Unicode()
class_docstring = Unicode()
call_def = Unicode()
call_docstring = Unicode()
source = Unicode()
def check(self, d):
Reference.check(self, d)
if d['argspec'] is not None:
ArgSpec().check(d['argspec'])
class ArgSpec(Reference):
args = List(Unicode)
varargs = Unicode()
varkw = Unicode()
defaults = List(Unicode)
class Status(Reference):
execution_state = Enum((u'busy', u'idle'))
class CompleteReply(Reference):
matches = List(Unicode)
# IOPub messages
class PyIn(Reference):
code = Unicode()
PyErr = ExecuteReplyError
class Stream(Reference):
name = Enum((u'stdout', u'stderr'))
data = Unicode()
mime_pat = re.compile(r'\w+/\w+')
class DisplayData(Reference):
source = Unicode()
metadata = Dict()
data = Dict()
def _data_changed(self, name, old, new):
for k,v in new.iteritems():
nt.assert_true(mime_pat.match(k))
nt.assert_true(isinstance(v, basestring), "expected string data, got %r" % v)
references = {
'execute_reply' : ExecuteReply(),
'object_info_reply' : OInfoReply(),
'status' : Status(),
'complete_reply' : CompleteReply(),
'pyin' : PyIn(),
'pyerr' : PyErr(),
'stream' : Stream(),
'display_data' : DisplayData(),
}
def validate_message(msg, msg_type=None, parent=None):
"""validate a message"""
RMessage().check(msg)
if msg_type:
nt.assert_equals(msg['msg_type'], msg_type)
if parent:
nt.assert_equal(msg['parent_header']['msg_id'], parent)
content = msg['content']
ref = references[msg['msg_type']]
ref.check(content)
#-----------------------------------------------------------------------------
# Tests
#-----------------------------------------------------------------------------
# Shell channel
def test_execute():
shell = KM.shell_channel
msg_id = shell.execute(code='x=1')
reply = shell.get_msg(timeout=2)
validate_message(reply, 'execute_reply', msg_id)
flush_channels()
def test_execute_silent():
msg_id, reply = execute(code='x=1', silent=True)
# flush status=idle
status = KM.sub_channel.get_msg(timeout=2)
validate_message(status, 'status', msg_id)
nt.assert_equals(status['content']['execution_state'], 'idle')
nt.assert_raises(Empty, KM.sub_channel.get_msg, timeout=0.1)
count = reply['execution_count']
msg_id, reply = execute(code='x=2', silent=True)
# flush status=idle
status = KM.sub_channel.get_msg(timeout=2)
validate_message(status, 'status', msg_id)
nt.assert_equals(status['content']['execution_state'], 'idle')
nt.assert_raises(Empty, KM.sub_channel.get_msg, timeout=0.1)
count_2 = reply['execution_count']
nt.assert_equals(count_2, count)
flush_channels()
def test_execute_error():
msg_id, reply = execute(code='1/0')
nt.assert_equals(reply['status'], 'error')
nt.assert_equals(reply['ename'], 'ZeroDivisionError')
pyerr = KM.sub_channel.get_msg(timeout=2)
validate_message(pyerr, 'pyerr', msg_id)
flush_channels()
def test_execute_inc():
"""execute request should increment execution_count"""
msg_id, reply = execute(code='x=1')
count = reply['execution_count']
flush_channels()
msg_id, reply = execute(code='x=2')
count_2 = reply['execution_count']
nt.assert_equals(count_2, count+1)
flush_channels()
def test_user_variables():
msg_id, reply = execute(code='x=1', user_variables=['x'])
user_variables = reply['user_variables']
nt.assert_equals(user_variables, {u'x' : u'1'})
flush_channels()
def test_user_expressions():
msg_id, reply = execute(code='x=1', user_expressions=dict(foo='x+1'))
user_expressions = reply['user_expressions']
nt.assert_equals(user_expressions, {u'foo' : u'2'})
flush_channels()
def test_oinfo():
shell = KM.shell_channel
msg_id = shell.object_info('a')
reply = shell.get_msg(timeout=2)
validate_message(reply, 'object_info_reply', msg_id)
flush_channels()
def test_oinfo_found():
shell = KM.shell_channel
msg_id, reply = execute(code='a=5')
msg_id = shell.object_info('a')
reply = shell.get_msg(timeout=2)
validate_message(reply, 'object_info_reply', msg_id)
content = reply['content']
nt.assert_true(content['found'])
flush_channels()
def test_complete():
shell = KM.shell_channel
msg_id, reply = execute(code="alpha = albert = 5")
msg_id = shell.complete('al', 'al', 2)
reply = shell.get_msg(timeout=2)
validate_message(reply, 'complete_reply', msg_id)
matches = reply['content']['matches']
for name in ('alpha', 'albert'):
nt.assert_true(name in matches, "Missing match: %r" % name)
flush_channels()
def test_stream():
msg_id, reply = execute("print('hi')")
stdout = KM.sub_channel.get_msg(timeout=2)
validate_message(stdout, 'stream', msg_id)
content = stdout['content']
nt.assert_equals(content['name'], u'stdout')
nt.assert_equals(content['data'], u'hi\n')
flush_channels()
def test_display():
msg_id, reply = execute("from IPython.core.display import display; display(1)")
display = KM.sub_channel.get_msg(timeout=2)
validate_message(display, 'display_data', parent=msg_id)
data = display['content']['data']
nt.assert_equals(data['text/plain'], u'1')
flush_channels()