ipkernel.py
367 lines
| 13.2 KiB
| text/x-python
|
PythonLexer
Fernando Perez
|
r2598 | #!/usr/bin/env python | ||
"""A simple interactive kernel that talks to a frontend over 0MQ. | ||||
Things to do: | ||||
* Implement `set_parent` logic. Right before doing exec, the Kernel should | ||||
call set_parent on all the PUB objects with the message about to be executed. | ||||
* Implement random port and security key logic. | ||||
* Implement control messages. | ||||
* Implement event loop and poll version. | ||||
""" | ||||
epatters
|
r2712 | #----------------------------------------------------------------------------- | ||
# Imports | ||||
#----------------------------------------------------------------------------- | ||||
epatters
|
r2641 | # Standard library imports. | ||
Fernando Perez
|
r2598 | import __builtin__ | ||
epatters
|
r2708 | from code import CommandCompiler | ||
epatters
|
r2700 | import os | ||
Fernando Perez
|
r2598 | import sys | ||
import time | ||||
import traceback | ||||
epatters
|
r2641 | # System library imports. | ||
Fernando Perez
|
r2598 | import zmq | ||
epatters
|
r2641 | # Local imports. | ||
Brian Granger
|
r2759 | from IPython.config.configurable import Configurable | ||
Brian Granger
|
r2773 | from IPython.zmq.zmqshell import ZMQInteractiveShell | ||
epatters
|
r2667 | from IPython.external.argparse import ArgumentParser | ||
Brian Granger
|
r2759 | from IPython.utils.traitlets import Instance | ||
from IPython.zmq.session import Session, Message | ||||
Fernando Perez
|
r2598 | from completer import KernelCompleter | ||
Brian Granger
|
r2759 | from iostream import OutStream | ||
from displayhook import DisplayHook | ||||
from exitpoller import ExitPollerUnix, ExitPollerWindows | ||||
Fernando Perez
|
r2598 | |||
epatters
|
r2712 | #----------------------------------------------------------------------------- | ||
Brian Granger
|
r2754 | # Main kernel class | ||
epatters
|
r2712 | #----------------------------------------------------------------------------- | ||
epatters
|
r2641 | |||
Brian Granger
|
r2759 | class Kernel(Configurable): | ||
Brian Granger
|
r2760 | shell = Instance('IPython.core.interactiveshell.InteractiveShellABC') | ||
Brian Granger
|
r2759 | session = Instance('IPython.zmq.session.Session') | ||
reply_socket = Instance('zmq.Socket') | ||||
pub_socket = Instance('zmq.Socket') | ||||
req_socket = Instance('zmq.Socket') | ||||
def __init__(self, **kwargs): | ||||
super(Kernel, self).__init__(**kwargs) | ||||
Brian Granger
|
r2773 | self.shell = ZMQInteractiveShell.instance() | ||
Fernando Perez
|
r2598 | |||
# Build dict of handlers for message types | ||||
epatters
|
r2612 | msg_types = [ 'execute_request', 'complete_request', | ||
'object_info_request' ] | ||||
Fernando Perez
|
r2598 | self.handlers = {} | ||
epatters
|
r2612 | for msg_type in msg_types: | ||
Fernando Perez
|
r2598 | self.handlers[msg_type] = getattr(self, msg_type) | ||
def abort_queue(self): | ||||
while True: | ||||
try: | ||||
ident = self.reply_socket.recv(zmq.NOBLOCK) | ||||
except zmq.ZMQError, e: | ||||
if e.errno == zmq.EAGAIN: | ||||
break | ||||
else: | ||||
assert self.reply_socket.rcvmore(), "Unexpected missing message part." | ||||
msg = self.reply_socket.recv_json() | ||||
print>>sys.__stdout__, "Aborting:" | ||||
print>>sys.__stdout__, Message(msg) | ||||
msg_type = msg['msg_type'] | ||||
reply_type = msg_type.split('_')[0] + '_reply' | ||||
reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg) | ||||
print>>sys.__stdout__, Message(reply_msg) | ||||
self.reply_socket.send(ident,zmq.SNDMORE) | ||||
self.reply_socket.send_json(reply_msg) | ||||
# We need to wait a bit for requests to come in. This can probably | ||||
# be set shorter for true asynchronous clients. | ||||
time.sleep(0.1) | ||||
def execute_request(self, ident, parent): | ||||
try: | ||||
code = parent[u'content'][u'code'] | ||||
except: | ||||
print>>sys.__stderr__, "Got bad msg: " | ||||
print>>sys.__stderr__, Message(parent) | ||||
return | ||||
pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent) | ||||
self.pub_socket.send_json(pyin_msg) | ||||
epatters
|
r2722 | |||
Fernando Perez
|
r2598 | try: | ||
epatters
|
r2730 | # Replace raw_input. Note that is not sufficient to replace | ||
# raw_input in the user namespace. | ||||
raw_input = lambda prompt='': self.raw_input(prompt, ident, parent) | ||||
__builtin__.raw_input = raw_input | ||||
# Configure the display hook. | ||||
Fernando Perez
|
r2598 | sys.displayhook.set_parent(parent) | ||
epatters
|
r2730 | |||
Brian Granger
|
r2759 | self.shell.runlines(code) | ||
# exec comp_code in self.user_ns, self.user_ns | ||||
Fernando Perez
|
r2598 | except: | ||
etype, evalue, tb = sys.exc_info() | ||||
tb = traceback.format_exception(etype, evalue, tb) | ||||
exc_content = { | ||||
u'status' : u'error', | ||||
u'traceback' : tb, | ||||
epatters
|
r2716 | u'ename' : unicode(etype.__name__), | ||
Fernando Perez
|
r2598 | u'evalue' : unicode(evalue) | ||
} | ||||
exc_msg = self.session.msg(u'pyerr', exc_content, parent) | ||||
self.pub_socket.send_json(exc_msg) | ||||
reply_content = exc_content | ||||
else: | ||||
reply_content = {'status' : 'ok'} | ||||
epatters
|
r2722 | |||
# Flush output before sending the reply. | ||||
sys.stderr.flush() | ||||
sys.stdout.flush() | ||||
# Send the reply. | ||||
Fernando Perez
|
r2598 | reply_msg = self.session.msg(u'execute_reply', reply_content, parent) | ||
print>>sys.__stdout__, Message(reply_msg) | ||||
self.reply_socket.send(ident, zmq.SNDMORE) | ||||
self.reply_socket.send_json(reply_msg) | ||||
if reply_msg['content']['status'] == u'error': | ||||
self.abort_queue() | ||||
epatters
|
r2730 | def raw_input(self, prompt, ident, parent): | ||
# Flush output before making the request. | ||||
sys.stderr.flush() | ||||
sys.stdout.flush() | ||||
# Send the input request. | ||||
content = dict(prompt=prompt) | ||||
msg = self.session.msg(u'input_request', content, parent) | ||||
self.req_socket.send_json(msg) | ||||
# Await a response. | ||||
reply = self.req_socket.recv_json() | ||||
try: | ||||
value = reply['content']['value'] | ||||
except: | ||||
print>>sys.__stderr__, "Got bad raw_input reply: " | ||||
print>>sys.__stderr__, Message(parent) | ||||
value = '' | ||||
return value | ||||
Fernando Perez
|
r2598 | def complete_request(self, ident, parent): | ||
matches = {'matches' : self.complete(parent), | ||||
'status' : 'ok'} | ||||
completion_msg = self.session.send(self.reply_socket, 'complete_reply', | ||||
matches, parent, ident) | ||||
print >> sys.__stdout__, completion_msg | ||||
def complete(self, msg): | ||||
Brian Granger
|
r2759 | return self.shell.complete(msg.content.line) | ||
Fernando Perez
|
r2598 | |||
epatters
|
r2612 | def object_info_request(self, ident, parent): | ||
context = parent['content']['oname'].split('.') | ||||
object_info = self.object_info(context) | ||||
msg = self.session.send(self.reply_socket, 'object_info_reply', | ||||
object_info, parent, ident) | ||||
print >> sys.__stdout__, msg | ||||
def object_info(self, context): | ||||
symbol, leftover = self.symbol_from_context(context) | ||||
if symbol is not None and not leftover: | ||||
doc = getattr(symbol, '__doc__', '') | ||||
else: | ||||
doc = '' | ||||
object_info = dict(docstring = doc) | ||||
return object_info | ||||
def symbol_from_context(self, context): | ||||
if not context: | ||||
return None, context | ||||
base_symbol_string = context[0] | ||||
Brian Granger
|
r2759 | symbol = self.shell.user_ns.get(base_symbol_string, None) | ||
epatters
|
r2612 | if symbol is None: | ||
symbol = __builtin__.__dict__.get(base_symbol_string, None) | ||||
if symbol is None: | ||||
return None, context | ||||
context = context[1:] | ||||
for i, name in enumerate(context): | ||||
new_symbol = getattr(symbol, name, None) | ||||
if new_symbol is None: | ||||
return symbol, context[i:] | ||||
else: | ||||
symbol = new_symbol | ||||
return symbol, [] | ||||
Fernando Perez
|
r2598 | def start(self): | ||
while True: | ||||
ident = self.reply_socket.recv() | ||||
epatters
|
r2730 | assert self.reply_socket.rcvmore(), "Missing message part." | ||
Fernando Perez
|
r2598 | msg = self.reply_socket.recv_json() | ||
omsg = Message(msg) | ||||
Brian Granger
|
r2606 | print>>sys.__stdout__ | ||
Fernando Perez
|
r2598 | print>>sys.__stdout__, omsg | ||
handler = self.handlers.get(omsg.msg_type, None) | ||||
if handler is None: | ||||
print >> sys.__stderr__, "UNKNOWN MESSAGE TYPE:", omsg | ||||
else: | ||||
handler(ident, omsg) | ||||
epatters
|
r2712 | #----------------------------------------------------------------------------- | ||
# Kernel main and launch functions | ||||
#----------------------------------------------------------------------------- | ||||
epatters
|
r2641 | def bind_port(socket, ip, port): | ||
""" Binds the specified ZMQ socket. If the port is less than zero, a random | ||||
port is chosen. Returns the port that was bound. | ||||
""" | ||||
connection = 'tcp://%s' % ip | ||||
epatters
|
r2704 | if port <= 0: | ||
epatters
|
r2641 | port = socket.bind_to_random_port(connection) | ||
else: | ||||
connection += ':%i' % port | ||||
socket.bind(connection) | ||||
return port | ||||
epatters
|
r2713 | |||
epatters
|
r2667 | def main(): | ||
""" Main entry point for launching a kernel. | ||||
epatters
|
r2641 | """ | ||
epatters
|
r2667 | # Parse command line arguments. | ||
parser = ArgumentParser() | ||||
parser.add_argument('--ip', type=str, default='127.0.0.1', | ||||
help='set the kernel\'s IP address [default: local]') | ||||
Brian Granger
|
r2690 | parser.add_argument('--xrep', type=int, metavar='PORT', default=0, | ||
epatters
|
r2702 | help='set the XREP channel port [default: random]') | ||
Brian Granger
|
r2690 | parser.add_argument('--pub', type=int, metavar='PORT', default=0, | ||
epatters
|
r2702 | help='set the PUB channel port [default: random]') | ||
parser.add_argument('--req', type=int, metavar='PORT', default=0, | ||||
help='set the REQ channel port [default: random]') | ||||
epatters
|
r2712 | if sys.platform == 'win32': | ||
parser.add_argument('--parent', type=int, metavar='HANDLE', | ||||
default=0, help='kill this process if the process ' | ||||
'with HANDLE dies') | ||||
else: | ||||
parser.add_argument('--parent', action='store_true', | ||||
help='kill this process if its parent dies') | ||||
epatters
|
r2667 | namespace = parser.parse_args() | ||
epatters
|
r2700 | # Create a context, a session, and the kernel sockets. | ||
Fernando Perez
|
r2598 | print >>sys.__stdout__, "Starting the kernel..." | ||
epatters
|
r2641 | context = zmq.Context() | ||
Fernando Perez
|
r2598 | session = Session(username=u'kernel') | ||
epatters
|
r2641 | reply_socket = context.socket(zmq.XREP) | ||
epatters
|
r2667 | xrep_port = bind_port(reply_socket, namespace.ip, namespace.xrep) | ||
print >>sys.__stdout__, "XREP Channel on port", xrep_port | ||||
Fernando Perez
|
r2598 | |||
epatters
|
r2641 | pub_socket = context.socket(zmq.PUB) | ||
epatters
|
r2667 | pub_port = bind_port(pub_socket, namespace.ip, namespace.pub) | ||
epatters
|
r2641 | print >>sys.__stdout__, "PUB Channel on port", pub_port | ||
Fernando Perez
|
r2598 | |||
epatters
|
r2705 | req_socket = context.socket(zmq.XREQ) | ||
req_port = bind_port(req_socket, namespace.ip, namespace.req) | ||||
print >>sys.__stdout__, "REQ Channel on port", req_port | ||||
Brian Granger
|
r2775 | # Redirect input streams. This needs to be done before the Kernel is done | ||
# because currently the Kernel creates a ZMQInteractiveShell, which | ||||
# holds references to sys.stdout and sys.stderr. | ||||
Brian Granger
|
r2767 | sys.stdout = OutStream(session, pub_socket, u'stdout') | ||
sys.stderr = OutStream(session, pub_socket, u'stderr') | ||||
Brian Granger
|
r2775 | # Set a displayhook. | ||
epatters
|
r2641 | sys.displayhook = DisplayHook(session, pub_socket) | ||
Fernando Perez
|
r2598 | |||
epatters
|
r2700 | # Create the kernel. | ||
Brian Granger
|
r2759 | kernel = Kernel( | ||
session=session, reply_socket=reply_socket, | ||||
pub_socket=pub_socket, req_socket=req_socket | ||||
) | ||||
Fernando Perez
|
r2598 | |||
epatters
|
r2700 | # Configure this kernel/process to die on parent termination, if necessary. | ||
epatters
|
r2712 | if namespace.parent: | ||
epatters
|
r2713 | if sys.platform == 'win32': | ||
poller = ExitPollerWindows(namespace.parent) | ||||
epatters
|
r2712 | else: | ||
epatters
|
r2713 | poller = ExitPollerUnix() | ||
poller.start() | ||||
epatters
|
r2700 | |||
# Start the kernel mainloop. | ||||
Fernando Perez
|
r2598 | kernel.start() | ||
epatters
|
r2704 | |||
epatters
|
r2702 | def launch_kernel(xrep_port=0, pub_port=0, req_port=0, independent=False): | ||
epatters
|
r2700 | """ Launches a localhost kernel, binding to the specified ports. | ||
Parameters | ||||
---------- | ||||
xrep_port : int, optional | ||||
The port to use for XREP channel. | ||||
pub_port : int, optional | ||||
epatters
|
r2702 | The port to use for the SUB channel. | ||
req_port : int, optional | ||||
The port to use for the REQ (raw input) channel. | ||||
epatters
|
r2641 | |||
epatters
|
r2700 | independent : bool, optional (default False) | ||
If set, the kernel process is guaranteed to survive if this process | ||||
dies. If not set, an effort is made to ensure that the kernel is killed | ||||
when this process dies. Note that in this case it is still good practice | ||||
epatters
|
r2712 | to kill kernels manually before exiting. | ||
epatters
|
r2700 | |||
Returns | ||||
------- | ||||
A tuple of form: | ||||
epatters
|
r2702 | (kernel_process, xrep_port, pub_port, req_port) | ||
where kernel_process is a Popen object and the ports are integers. | ||||
epatters
|
r2641 | """ | ||
import socket | ||||
from subprocess import Popen | ||||
epatters
|
r2667 | # Find open ports as necessary. | ||
epatters
|
r2641 | ports = [] | ||
epatters
|
r2702 | ports_needed = int(xrep_port <= 0) + int(pub_port <= 0) + int(req_port <= 0) | ||
epatters
|
r2667 | for i in xrange(ports_needed): | ||
epatters
|
r2641 | sock = socket.socket() | ||
sock.bind(('', 0)) | ||||
ports.append(sock) | ||||
for i, sock in enumerate(ports): | ||||
port = sock.getsockname()[1] | ||||
sock.close() | ||||
ports[i] = port | ||||
epatters
|
r2700 | if xrep_port <= 0: | ||
xrep_port = ports.pop(0) | ||||
if pub_port <= 0: | ||||
pub_port = ports.pop(0) | ||||
epatters
|
r2702 | if req_port <= 0: | ||
req_port = ports.pop(0) | ||||
epatters
|
r2667 | |||
epatters
|
r2641 | # Spawn a kernel. | ||
Brian Granger
|
r2755 | command = 'from IPython.zmq.ipkernel import main; main()' | ||
epatters
|
r2702 | arguments = [ sys.executable, '-c', command, '--xrep', str(xrep_port), | ||
'--pub', str(pub_port), '--req', str(req_port) ] | ||||
epatters
|
r2700 | if independent: | ||
if sys.platform == 'win32': | ||||
proc = Popen(['start', '/b'] + arguments, shell=True) | ||||
else: | ||||
proc = Popen(arguments, preexec_fn=lambda: os.setsid()) | ||||
else: | ||||
epatters
|
r2712 | if sys.platform == 'win32': | ||
from _subprocess import DuplicateHandle, GetCurrentProcess, \ | ||||
DUPLICATE_SAME_ACCESS | ||||
pid = GetCurrentProcess() | ||||
handle = DuplicateHandle(pid, pid, pid, 0, | ||||
True, # Inheritable by new processes. | ||||
DUPLICATE_SAME_ACCESS) | ||||
proc = Popen(arguments + ['--parent', str(int(handle))]) | ||||
else: | ||||
proc = Popen(arguments + ['--parent']) | ||||
epatters
|
r2700 | |||
epatters
|
r2702 | return proc, xrep_port, pub_port, req_port | ||
epatters
|
r2667 | |||
Fernando Perez
|
r2598 | |||
if __name__ == '__main__': | ||||
epatters
|
r2667 | main() | ||