##// END OF EJS Templates
Improve tooltip tringgering,make it configurable...
Improve tooltip tringgering,make it configurable As until now, when pressing tab and a white space was preceding the cursor The completion was triggerd with the whole namespace in it. Now if a whitespace or an opening bracket is just befor the cursor it will try to display a tooltip. The logic to find what object_info_request is send have been sightly changed to try to match the expression just before the last unmached openig bracket before the cursor (without considering what is after the cursor). example (_|_ represent the cursor): >>> his_|_<tab> # completion >>> hist(_|_<tab> # tooltip on hist >>> hist(rand(20),bins=range(_|_ <tab> #tooltip on range >>> hist(rand(20),bins=range(10), _|_ <tab> # tooltip on hist (whitespace before cursor) >>> hist(rand(20),bins=range(10),_|_ <tab> # completion as we dont care of what is after the cursor: >>> hist(rand(5000), bins=50, _|_orientaion='horizontal') # and tab, equivalent to >>> hist(rand(5000), bins=50, _|_<tab> # onte the space again >>> hist(_|_rand(5000), bins=50, orientaion='horizontal') # and tab, equivalent to >>> hist(_|_ the 4 give tooltip on hist note that you can get tooltip on things that aren't function by appending a '(' like >>> matplotlib(<tab> Which is kinda weird... so we might want to bound another shortcut for tooltip, but which matches without bracket... additionnaly I have added a "Config" pannel in the left pannel with a checkbox bind to wether or not activate this functionnality Note, (rebase and edited commit, might not work perfetly xwithout the following ones)

File last commit:

r5344:293d3eed
r5399:f73c6ce0
Show More
streamkernel.py
439 lines | 15.6 KiB | text/x-python | PythonLexer
"""
Kernel adapted from kernel.py to use ZMQ Streams
Authors:
* Min RK
* Brian Granger
* Fernando Perez
* Evan Patterson
"""
#-----------------------------------------------------------------------------
# Copyright (C) 2010-2011 The IPython Development Team
#
# Distributed under the terms of the BSD License. The full license is in
# the file COPYING, distributed as part of this software.
#-----------------------------------------------------------------------------
#-----------------------------------------------------------------------------
# Imports
#-----------------------------------------------------------------------------
# Standard library imports.
from __future__ import print_function
import sys
import time
from code import CommandCompiler
from datetime import datetime
from pprint import pprint
# System library imports.
import zmq
from zmq.eventloop import ioloop, zmqstream
# Local imports.
from IPython.utils.traitlets import Instance, List, Integer, Dict, Set, Unicode, CBytes
from IPython.zmq.completer import KernelCompleter
from IPython.parallel.error import wrap_exception
from IPython.parallel.factory import SessionFactory
from IPython.parallel.util import serialize_object, unpack_apply_message, asbytes
def printer(*args):
pprint(args, stream=sys.__stdout__)
class _Passer(zmqstream.ZMQStream):
"""Empty class that implements `send()` that does nothing.
Subclass ZMQStream for Session typechecking
"""
def __init__(self, *args, **kwargs):
pass
def send(self, *args, **kwargs):
pass
send_multipart = send
#-----------------------------------------------------------------------------
# Main kernel class
#-----------------------------------------------------------------------------
class Kernel(SessionFactory):
#---------------------------------------------------------------------------
# Kernel interface
#---------------------------------------------------------------------------
# kwargs:
exec_lines = List(Unicode, config=True,
help="List of lines to execute")
# identities:
int_id = Integer(-1)
bident = CBytes()
ident = Unicode()
def _ident_changed(self, name, old, new):
self.bident = asbytes(new)
user_ns = Dict(config=True, help="""Set the user's namespace of the Kernel""")
control_stream = Instance(zmqstream.ZMQStream)
task_stream = Instance(zmqstream.ZMQStream)
iopub_stream = Instance(zmqstream.ZMQStream)
client = Instance('IPython.parallel.Client')
# internals
shell_streams = List()
compiler = Instance(CommandCompiler, (), {})
completer = Instance(KernelCompleter)
aborted = Set()
shell_handlers = Dict()
control_handlers = Dict()
def _set_prefix(self):
self.prefix = "engine.%s"%self.int_id
def _connect_completer(self):
self.completer = KernelCompleter(self.user_ns)
def __init__(self, **kwargs):
super(Kernel, self).__init__(**kwargs)
self._set_prefix()
self._connect_completer()
self.on_trait_change(self._set_prefix, 'id')
self.on_trait_change(self._connect_completer, 'user_ns')
# Build dict of handlers for message types
for msg_type in ['execute_request', 'complete_request', 'apply_request',
'clear_request']:
self.shell_handlers[msg_type] = getattr(self, msg_type)
for msg_type in ['shutdown_request', 'abort_request']+self.shell_handlers.keys():
self.control_handlers[msg_type] = getattr(self, msg_type)
self._initial_exec_lines()
def _wrap_exception(self, method=None):
e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method=method)
content=wrap_exception(e_info)
return content
def _initial_exec_lines(self):
s = _Passer()
content = dict(silent=True, user_variable=[],user_expressions=[])
for line in self.exec_lines:
self.log.debug("executing initialization: %s"%line)
content.update({'code':line})
msg = self.session.msg('execute_request', content)
self.execute_request(s, [], msg)
#-------------------- control handlers -----------------------------
def abort_queues(self):
for stream in self.shell_streams:
if stream:
self.abort_queue(stream)
def abort_queue(self, stream):
while True:
idents,msg = self.session.recv(stream, zmq.NOBLOCK, content=True)
if msg is None:
return
self.log.info("Aborting:")
self.log.info(str(msg))
msg_type = msg['header']['msg_type']
reply_type = msg_type.split('_')[0] + '_reply'
# reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg)
# self.reply_socket.send(ident,zmq.SNDMORE)
# self.reply_socket.send_json(reply_msg)
reply_msg = self.session.send(stream, reply_type,
content={'status' : 'aborted'}, parent=msg, ident=idents)
self.log.debug(str(reply_msg))
# We need to wait a bit for requests to come in. This can probably
# be set shorter for true asynchronous clients.
time.sleep(0.05)
def abort_request(self, stream, ident, parent):
"""abort a specifig msg by id"""
msg_ids = parent['content'].get('msg_ids', None)
if isinstance(msg_ids, basestring):
msg_ids = [msg_ids]
if not msg_ids:
self.abort_queues()
for mid in msg_ids:
self.aborted.add(str(mid))
content = dict(status='ok')
reply_msg = self.session.send(stream, 'abort_reply', content=content,
parent=parent, ident=ident)
self.log.debug(str(reply_msg))
def shutdown_request(self, stream, ident, parent):
"""kill ourself. This should really be handled in an external process"""
try:
self.abort_queues()
except:
content = self._wrap_exception('shutdown')
else:
content = dict(parent['content'])
content['status'] = 'ok'
msg = self.session.send(stream, 'shutdown_reply',
content=content, parent=parent, ident=ident)
self.log.debug(str(msg))
dc = ioloop.DelayedCallback(lambda : sys.exit(0), 1000, self.loop)
dc.start()
def dispatch_control(self, msg):
idents,msg = self.session.feed_identities(msg, copy=False)
try:
msg = self.session.unserialize(msg, content=True, copy=False)
except:
self.log.error("Invalid Message", exc_info=True)
return
else:
self.log.debug("Control received, %s", msg)
header = msg['header']
msg_id = header['msg_id']
msg_type = header['msg_type']
handler = self.control_handlers.get(msg_type, None)
if handler is None:
self.log.error("UNKNOWN CONTROL MESSAGE TYPE: %r"%msg_type)
else:
handler(self.control_stream, idents, msg)
#-------------------- queue helpers ------------------------------
def check_dependencies(self, dependencies):
if not dependencies:
return True
if len(dependencies) == 2 and dependencies[0] in 'any all'.split():
anyorall = dependencies[0]
dependencies = dependencies[1]
else:
anyorall = 'all'
results = self.client.get_results(dependencies,status_only=True)
if results['status'] != 'ok':
return False
if anyorall == 'any':
if not results['completed']:
return False
else:
if results['pending']:
return False
return True
def check_aborted(self, msg_id):
return msg_id in self.aborted
#-------------------- queue handlers -----------------------------
def clear_request(self, stream, idents, parent):
"""Clear our namespace."""
self.user_ns = {}
msg = self.session.send(stream, 'clear_reply', ident=idents, parent=parent,
content = dict(status='ok'))
self._initial_exec_lines()
def execute_request(self, stream, ident, parent):
self.log.debug('execute request %s'%parent)
try:
code = parent[u'content'][u'code']
except:
self.log.error("Got bad msg: %s"%parent, exc_info=True)
return
self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent,
ident=asbytes('%s.pyin'%self.prefix))
started = datetime.now()
try:
comp_code = self.compiler(code, '<zmq-kernel>')
# allow for not overriding displayhook
if hasattr(sys.displayhook, 'set_parent'):
sys.displayhook.set_parent(parent)
sys.stdout.set_parent(parent)
sys.stderr.set_parent(parent)
exec comp_code in self.user_ns, self.user_ns
except:
exc_content = self._wrap_exception('execute')
# exc_msg = self.session.msg(u'pyerr', exc_content, parent)
self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent,
ident=asbytes('%s.pyerr'%self.prefix))
reply_content = exc_content
else:
reply_content = {'status' : 'ok'}
reply_msg = self.session.send(stream, u'execute_reply', reply_content, parent=parent,
ident=ident, subheader = dict(started=started))
self.log.debug(str(reply_msg))
if reply_msg['content']['status'] == u'error':
self.abort_queues()
def complete_request(self, stream, ident, parent):
matches = {'matches' : self.complete(parent),
'status' : 'ok'}
completion_msg = self.session.send(stream, 'complete_reply',
matches, parent, ident)
# print >> sys.__stdout__, completion_msg
def complete(self, msg):
return self.completer.complete(msg.content.line, msg.content.text)
def apply_request(self, stream, ident, parent):
# flush previous reply, so this request won't block it
stream.flush(zmq.POLLOUT)
try:
content = parent[u'content']
bufs = parent[u'buffers']
msg_id = parent['header']['msg_id']
# bound = parent['header'].get('bound', False)
except:
self.log.error("Got bad msg: %s"%parent, exc_info=True)
return
# pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
# self.iopub_stream.send(pyin_msg)
# self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent)
sub = {'dependencies_met' : True, 'engine' : self.ident,
'started': datetime.now()}
try:
# allow for not overriding displayhook
if hasattr(sys.displayhook, 'set_parent'):
sys.displayhook.set_parent(parent)
sys.stdout.set_parent(parent)
sys.stderr.set_parent(parent)
# exec "f(*args,**kwargs)" in self.user_ns, self.user_ns
working = self.user_ns
# suffix =
prefix = "_"+str(msg_id).replace("-","")+"_"
f,args,kwargs = unpack_apply_message(bufs, working, copy=False)
# if bound:
# bound_ns = Namespace(working)
# args = [bound_ns]+list(args)
fname = getattr(f, '__name__', 'f')
fname = prefix+"f"
argname = prefix+"args"
kwargname = prefix+"kwargs"
resultname = prefix+"result"
ns = { fname : f, argname : args, kwargname : kwargs , resultname : None }
# print ns
working.update(ns)
code = "%s=%s(*%s,**%s)"%(resultname, fname, argname, kwargname)
try:
exec code in working,working
result = working.get(resultname)
finally:
for key in ns.iterkeys():
working.pop(key)
# if bound:
# working.update(bound_ns)
packed_result,buf = serialize_object(result)
result_buf = [packed_result]+buf
except:
exc_content = self._wrap_exception('apply')
# exc_msg = self.session.msg(u'pyerr', exc_content, parent)
self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent,
ident=asbytes('%s.pyerr'%self.prefix))
reply_content = exc_content
result_buf = []
if exc_content['ename'] == 'UnmetDependency':
sub['dependencies_met'] = False
else:
reply_content = {'status' : 'ok'}
# put 'ok'/'error' status in header, for scheduler introspection:
sub['status'] = reply_content['status']
reply_msg = self.session.send(stream, u'apply_reply', reply_content,
parent=parent, ident=ident,buffers=result_buf, subheader=sub)
# flush i/o
# should this be before reply_msg is sent, like in the single-kernel code,
# or should nothing get in the way of real results?
sys.stdout.flush()
sys.stderr.flush()
def dispatch_queue(self, stream, msg):
self.control_stream.flush()
idents,msg = self.session.feed_identities(msg, copy=False)
try:
msg = self.session.unserialize(msg, content=True, copy=False)
except:
self.log.error("Invalid Message", exc_info=True)
return
else:
self.log.debug("Message received, %s", msg)
header = msg['header']
msg_id = header['msg_id']
msg_type = msg['header']['msg_type']
if self.check_aborted(msg_id):
self.aborted.remove(msg_id)
# is it safe to assume a msg_id will not be resubmitted?
reply_type = msg_type.split('_')[0] + '_reply'
status = {'status' : 'aborted'}
reply_msg = self.session.send(stream, reply_type, subheader=status,
content=status, parent=msg, ident=idents)
return
handler = self.shell_handlers.get(msg_type, None)
if handler is None:
self.log.error("UNKNOWN MESSAGE TYPE: %r"%msg_type)
else:
handler(stream, idents, msg)
def start(self):
#### stream mode:
if self.control_stream:
self.control_stream.on_recv(self.dispatch_control, copy=False)
self.control_stream.on_err(printer)
def make_dispatcher(stream):
def dispatcher(msg):
return self.dispatch_queue(stream, msg)
return dispatcher
for s in self.shell_streams:
s.on_recv(make_dispatcher(s), copy=False)
s.on_err(printer)
if self.iopub_stream:
self.iopub_stream.on_err(printer)
#### while True mode:
# while True:
# idle = True
# try:
# msg = self.shell_stream.socket.recv_multipart(
# zmq.NOBLOCK, copy=False)
# except zmq.ZMQError, e:
# if e.errno != zmq.EAGAIN:
# raise e
# else:
# idle=False
# self.dispatch_queue(self.shell_stream, msg)
#
# if not self.task_stream.empty():
# idle=False
# msg = self.task_stream.recv_multipart()
# self.dispatch_queue(self.task_stream, msg)
# if idle:
# # don't busywait
# time.sleep(1e-3)