##// END OF EJS Templates
Possible fix for GH-169
MinRK -
Show More
@@ -1,178 +1,182 b''
1 1 """ A minimal application using the Qt console-style IPython frontend.
2 2 """
3 3
4 4 #-----------------------------------------------------------------------------
5 5 # Imports
6 6 #-----------------------------------------------------------------------------
7 7
8 8 # Systemm library imports
9 9 from PyQt4 import QtGui
10 10
11 11 # Local imports
12 12 from IPython.external.argparse import ArgumentParser
13 13 from IPython.frontend.qt.console.frontend_widget import FrontendWidget
14 14 from IPython.frontend.qt.console.ipython_widget import IPythonWidget
15 15 from IPython.frontend.qt.console.rich_ipython_widget import RichIPythonWidget
16 16 from IPython.frontend.qt.kernelmanager import QtKernelManager
17 17
18 18 #-----------------------------------------------------------------------------
19 # Constants
19 # Network Constants
20 20 #-----------------------------------------------------------------------------
21 21
22 LOCALHOST = '127.0.0.1'
22 from IPython.utils.localinterfaces import LOCALHOST, LOCAL_IPS
23 23
24 24 #-----------------------------------------------------------------------------
25 25 # Classes
26 26 #-----------------------------------------------------------------------------
27 27
28 28 class MainWindow(QtGui.QMainWindow):
29 29
30 30 #---------------------------------------------------------------------------
31 31 # 'object' interface
32 32 #---------------------------------------------------------------------------
33 33
34 34 def __init__(self, app, frontend, existing=False, may_close=True):
35 35 """ Create a MainWindow for the specified FrontendWidget.
36 36
37 37 The app is passed as an argument to allow for different
38 38 closing behavior depending on whether we are the Kernel's parent.
39 39
40 40 If existing is True, then this Console does not own the Kernel.
41 41
42 42 If may_close is True, then this Console is permitted to close the kernel
43 43 """
44 44 super(MainWindow, self).__init__()
45 45 self._app = app
46 46 self._frontend = frontend
47 47 self._existing = existing
48 if not existing:
48 if existing:
49 49 self._may_close = may_close
50 50 else:
51 51 self._may_close = True
52 52 self._frontend.exit_requested.connect(self.close)
53 53 self.setCentralWidget(frontend)
54 54
55 55 #---------------------------------------------------------------------------
56 56 # QWidget interface
57 57 #---------------------------------------------------------------------------
58 58
59 59 def closeEvent(self, event):
60 60 """ Reimplemented to prompt the user and close the kernel cleanly.
61 61 """
62 62 kernel_manager = self._frontend.kernel_manager
63 63 if kernel_manager and kernel_manager.channels_running:
64 64 title = self.window().windowTitle()
65 65 if self._may_close:
66 66 reply = QtGui.QMessageBox.question(self, title,
67 67 "You are closing this Console window."+
68 68 "\nWould you like to quit the Kernel and all attached Consoles as well?",
69 69 'Cancel', 'No, just this Console', 'Yes, quit everything')
70 70 if reply == 2: # close All
71 71 kernel_manager.shutdown_kernel()
72 72 #kernel_manager.stop_channels()
73 73 event.accept()
74 74 elif reply == 1: # close Console
75 75 if not self._existing:
76 76 # I have the kernel: don't quit, just close the window
77 77 self._app.setQuitOnLastWindowClosed(False)
78 78 self.deleteLater()
79 79 event.accept()
80 80 else:
81 81 event.ignore()
82 82 else:
83 83 reply = QtGui.QMessageBox.question(self, title,
84 84 "Are you sure you want to close this Console?\n"+
85 85 "The Kernel and other Consoles will remain active.",
86 86 QtGui.QMessageBox.Yes, QtGui.QMessageBox.No
87 87 )
88 88 if reply == QtGui.QMessageBox.Yes:
89 89 event.accept()
90 90 else:
91 91 event.ignore()
92 92
93 93
94 94 #-----------------------------------------------------------------------------
95 95 # Main entry point
96 96 #-----------------------------------------------------------------------------
97 97
98 98 def main():
99 99 """ Entry point for application.
100 100 """
101 101 # Parse command line arguments.
102 102 parser = ArgumentParser()
103 103 kgroup = parser.add_argument_group('kernel options')
104 104 kgroup.add_argument('-e', '--existing', action='store_true',
105 105 help='connect to an existing kernel')
106 106 kgroup.add_argument('--ip', type=str, default=LOCALHOST,
107 107 help='set the kernel\'s IP address [default localhost]')
108 108 kgroup.add_argument('--xreq', type=int, metavar='PORT', default=0,
109 109 help='set the XREQ channel port [default random]')
110 110 kgroup.add_argument('--sub', type=int, metavar='PORT', default=0,
111 111 help='set the SUB channel port [default random]')
112 112 kgroup.add_argument('--rep', type=int, metavar='PORT', default=0,
113 113 help='set the REP channel port [default random]')
114 114 kgroup.add_argument('--hb', type=int, metavar='PORT', default=0,
115 115 help='set the heartbeat port [default: random]')
116 116
117 117 egroup = kgroup.add_mutually_exclusive_group()
118 118 egroup.add_argument('--pure', action='store_true', help = \
119 119 'use a pure Python kernel instead of an IPython kernel')
120 120 egroup.add_argument('--pylab', type=str, metavar='GUI', nargs='?',
121 121 const='auto', help = \
122 122 "Pre-load matplotlib and numpy for interactive use. If GUI is not \
123 123 given, the GUI backend is matplotlib's, otherwise use one of: \
124 124 ['tk', 'gtk', 'qt', 'wx', 'inline'].")
125 125
126 126 wgroup = parser.add_argument_group('widget options')
127 127 wgroup.add_argument('--paging', type=str, default='inside',
128 128 choices = ['inside', 'hsplit', 'vsplit', 'none'],
129 129 help='set the paging style [default inside]')
130 130 wgroup.add_argument('--rich', action='store_true',
131 131 help='enable rich text support')
132 132 wgroup.add_argument('--gui-completion', action='store_true',
133 133 help='use a GUI widget for tab completion')
134 134
135 135 args = parser.parse_args()
136 136
137 137 # Don't let Qt or ZMQ swallow KeyboardInterupts.
138 138 import signal
139 139 signal.signal(signal.SIGINT, signal.SIG_DFL)
140 140
141 141 # Create a KernelManager and start a kernel.
142 142 kernel_manager = QtKernelManager(xreq_address=(args.ip, args.xreq),
143 143 sub_address=(args.ip, args.sub),
144 144 rep_address=(args.ip, args.rep),
145 145 hb_address=(args.ip, args.hb))
146 146 if not args.existing:
147 # if not args.ip in LOCAL_IPS+ALL_ALIAS:
148 # raise ValueError("Must bind a local ip, such as: %s"%LOCAL_IPS)
149
150 kwargs = dict(ip=args.ip)
147 151 if args.pure:
148 kernel_manager.start_kernel(ipython=False)
152 kwargs['ipython']=False
149 153 elif args.pylab:
150 kernel_manager.start_kernel(pylab=args.pylab)
151 else:
152 kernel_manager.start_kernel()
154 kwargs['pylab']=args.pylab
155
156 kernel_manager.start_kernel(**kwargs)
153 157 kernel_manager.start_channels()
154 158
155 159 local_kernel = (not args.existing) or args.ip == LOCALHOST
156 160 # Create the widget.
157 161 app = QtGui.QApplication([])
158 162 if args.pure:
159 163 kind = 'rich' if args.rich else 'plain'
160 164 widget = FrontendWidget(kind=kind, paging=args.paging, local_kernel=local_kernel)
161 165 elif args.rich or args.pylab:
162 166 widget = RichIPythonWidget(paging=args.paging, local_kernel=local_kernel)
163 167 else:
164 168 widget = IPythonWidget(paging=args.paging, local_kernel=local_kernel)
165 169 widget.gui_completion = args.gui_completion
166 170 widget.kernel_manager = kernel_manager
167 171
168 172 # Create the main window.
169 173 window = MainWindow(app, widget, args.existing, may_close=local_kernel)
170 174 window.setWindowTitle('Python' if args.pure else 'IPython')
171 175 window.show()
172 176
173 177 # Start the application main loop.
174 178 app.exec_()
175 179
176 180
177 181 if __name__ == '__main__':
178 182 main()
@@ -1,259 +1,260 b''
1 1 """ Defines helper functions for creating kernel entry points and process
2 2 launchers.
3 3 """
4 4
5 5 # Standard library imports.
6 6 import atexit
7 7 import os
8 8 import socket
9 9 from subprocess import Popen, PIPE
10 10 import sys
11 11
12 12 # System library imports.
13 13 import zmq
14 14
15 15 # Local imports.
16 16 from IPython.core.ultratb import FormattedTB
17 17 from IPython.external.argparse import ArgumentParser
18 18 from IPython.utils import io
19 from IPython.utils.localinterfaces import LOCALHOST
19 20 from displayhook import DisplayHook
20 21 from heartbeat import Heartbeat
21 22 from iostream import OutStream
22 23 from parentpoller import ParentPollerUnix, ParentPollerWindows
23 24 from session import Session
24 25
25 26 def bind_port(socket, ip, port):
26 27 """ Binds the specified ZMQ socket. If the port is zero, a random port is
27 28 chosen. Returns the port that was bound.
28 29 """
29 30 connection = 'tcp://%s' % ip
30 31 if port <= 0:
31 32 port = socket.bind_to_random_port(connection)
32 33 else:
33 34 connection += ':%i' % port
34 35 socket.bind(connection)
35 36 return port
36 37
37 38
38 39 def make_argument_parser():
39 40 """ Creates an ArgumentParser for the generic arguments supported by all
40 41 kernel entry points.
41 42 """
42 43 parser = ArgumentParser()
43 parser.add_argument('--ip', type=str, default='127.0.0.1',
44 parser.add_argument('--ip', type=str, default=LOCALHOST,
44 45 help='set the kernel\'s IP address [default: local]')
45 46 parser.add_argument('--xrep', type=int, metavar='PORT', default=0,
46 47 help='set the XREP channel port [default: random]')
47 48 parser.add_argument('--pub', type=int, metavar='PORT', default=0,
48 49 help='set the PUB channel port [default: random]')
49 50 parser.add_argument('--req', type=int, metavar='PORT', default=0,
50 51 help='set the REQ channel port [default: random]')
51 52 parser.add_argument('--hb', type=int, metavar='PORT', default=0,
52 53 help='set the heartbeat port [default: random]')
53 54
54 55 if sys.platform == 'win32':
55 56 parser.add_argument('--interrupt', type=int, metavar='HANDLE',
56 57 default=0, help='interrupt this process when '
57 58 'HANDLE is signaled')
58 59 parser.add_argument('--parent', type=int, metavar='HANDLE',
59 60 default=0, help='kill this process if the process '
60 61 'with HANDLE dies')
61 62 else:
62 63 parser.add_argument('--parent', action='store_true',
63 64 help='kill this process if its parent dies')
64 65
65 66 return parser
66 67
67 68
68 69 def make_kernel(namespace, kernel_factory,
69 70 out_stream_factory=None, display_hook_factory=None):
70 71 """ Creates a kernel, redirects stdout/stderr, and installs a display hook
71 72 and exception handler.
72 73 """
73 74 # If running under pythonw.exe, the interpreter will crash if more than 4KB
74 75 # of data is written to stdout or stderr. This is a bug that has been with
75 76 # Python for a very long time; see http://bugs.python.org/issue706263.
76 77 if sys.executable.endswith('pythonw.exe'):
77 78 blackhole = file(os.devnull, 'w')
78 79 sys.stdout = sys.stderr = blackhole
79 80 sys.__stdout__ = sys.__stderr__ = blackhole
80 81
81 82 # Install minimal exception handling
82 83 sys.excepthook = FormattedTB(mode='Verbose', color_scheme='NoColor',
83 84 ostream=sys.__stdout__)
84 85
85 86 # Create a context, a session, and the kernel sockets.
86 87 io.raw_print("Starting the kernel at pid:", os.getpid())
87 88 context = zmq.Context()
88 89 # Uncomment this to try closing the context.
89 90 # atexit.register(context.close)
90 91 session = Session(username=u'kernel')
91 92
92 93 reply_socket = context.socket(zmq.XREP)
93 94 xrep_port = bind_port(reply_socket, namespace.ip, namespace.xrep)
94 95 io.raw_print("XREP Channel on port", xrep_port)
95 96
96 97 pub_socket = context.socket(zmq.PUB)
97 98 pub_port = bind_port(pub_socket, namespace.ip, namespace.pub)
98 99 io.raw_print("PUB Channel on port", pub_port)
99 100
100 101 req_socket = context.socket(zmq.XREQ)
101 102 req_port = bind_port(req_socket, namespace.ip, namespace.req)
102 103 io.raw_print("REQ Channel on port", req_port)
103 104
104 105 hb = Heartbeat(context, (namespace.ip, namespace.hb))
105 106 hb.start()
106 107 hb_port = hb.port
107 108 io.raw_print("Heartbeat REP Channel on port", hb_port)
108 109
109 110 # Helper to make it easier to connect to an existing kernel, until we have
110 111 # single-port connection negotiation fully implemented.
111 112 io.raw_print("To connect another client to this kernel, use:")
112 113 io.raw_print("-e --xreq {0} --sub {1} --rep {2} --hb {3}".format(
113 114 xrep_port, pub_port, req_port, hb_port))
114 115
115 116 # Redirect input streams and set a display hook.
116 117 if out_stream_factory:
117 118 sys.stdout = out_stream_factory(session, pub_socket, u'stdout')
118 119 sys.stderr = out_stream_factory(session, pub_socket, u'stderr')
119 120 if display_hook_factory:
120 121 sys.displayhook = display_hook_factory(session, pub_socket)
121 122
122 123 # Create the kernel.
123 124 kernel = kernel_factory(session=session, reply_socket=reply_socket,
124 125 pub_socket=pub_socket, req_socket=req_socket)
125 126 kernel.record_ports(xrep_port=xrep_port, pub_port=pub_port,
126 127 req_port=req_port, hb_port=hb_port)
127 128 return kernel
128 129
129 130
130 131 def start_kernel(namespace, kernel):
131 132 """ Starts a kernel.
132 133 """
133 134 # Configure this kernel process to poll the parent process, if necessary.
134 135 if sys.platform == 'win32':
135 136 if namespace.interrupt or namespace.parent:
136 137 poller = ParentPollerWindows(namespace.interrupt, namespace.parent)
137 138 poller.start()
138 139 elif namespace.parent:
139 140 poller = ParentPollerUnix()
140 141 poller.start()
141 142
142 143 # Start the kernel mainloop.
143 144 kernel.start()
144 145
145 146
146 147 def make_default_main(kernel_factory):
147 148 """ Creates the simplest possible kernel entry point.
148 149 """
149 150 def main():
150 151 namespace = make_argument_parser().parse_args()
151 152 kernel = make_kernel(namespace, kernel_factory, OutStream, DisplayHook)
152 153 start_kernel(namespace, kernel)
153 154 return main
154 155
155 156
156 157 def base_launch_kernel(code, xrep_port=0, pub_port=0, req_port=0, hb_port=0,
157 158 independent=False, extra_arguments=[]):
158 159 """ Launches a localhost kernel, binding to the specified ports.
159 160
160 161 Parameters
161 162 ----------
162 163 code : str,
163 164 A string of Python code that imports and executes a kernel entry point.
164 165
165 166 xrep_port : int, optional
166 167 The port to use for XREP channel.
167 168
168 169 pub_port : int, optional
169 170 The port to use for the SUB channel.
170 171
171 172 req_port : int, optional
172 173 The port to use for the REQ (raw input) channel.
173 174
174 175 hb_port : int, optional
175 176 The port to use for the hearbeat REP channel.
176 177
177 178 independent : bool, optional (default False)
178 179 If set, the kernel process is guaranteed to survive if this process
179 180 dies. If not set, an effort is made to ensure that the kernel is killed
180 181 when this process dies. Note that in this case it is still good practice
181 182 to kill kernels manually before exiting.
182 183
183 184 extra_arguments = list, optional
184 185 A list of extra arguments to pass when executing the launch code.
185 186
186 187 Returns
187 188 -------
188 189 A tuple of form:
189 190 (kernel_process, xrep_port, pub_port, req_port)
190 191 where kernel_process is a Popen object and the ports are integers.
191 192 """
192 193 # Find open ports as necessary.
193 194 ports = []
194 195 ports_needed = int(xrep_port <= 0) + int(pub_port <= 0) + \
195 196 int(req_port <= 0) + int(hb_port <= 0)
196 197 for i in xrange(ports_needed):
197 198 sock = socket.socket()
198 199 sock.bind(('', 0))
199 200 ports.append(sock)
200 201 for i, sock in enumerate(ports):
201 202 port = sock.getsockname()[1]
202 203 sock.close()
203 204 ports[i] = port
204 205 if xrep_port <= 0:
205 206 xrep_port = ports.pop(0)
206 207 if pub_port <= 0:
207 208 pub_port = ports.pop(0)
208 209 if req_port <= 0:
209 210 req_port = ports.pop(0)
210 211 if hb_port <= 0:
211 212 hb_port = ports.pop(0)
212 213
213 214 # Build the kernel launch command.
214 215 arguments = [ sys.executable, '-c', code, '--xrep', str(xrep_port),
215 216 '--pub', str(pub_port), '--req', str(req_port),
216 217 '--hb', str(hb_port) ]
217 218 arguments.extend(extra_arguments)
218 219
219 220 # Spawn a kernel.
220 221 if sys.platform == 'win32':
221 222 # Create a Win32 event for interrupting the kernel.
222 223 interrupt_event = ParentPollerWindows.create_interrupt_event()
223 224 arguments += [ '--interrupt', str(int(interrupt_event)) ]
224 225
225 226 # If using pythonw, stdin, stdout, and stderr are invalid. Popen will
226 227 # fail unless they are suitably redirected. We don't read from the
227 228 # pipes, but they must exist.
228 229 redirect = PIPE if sys.executable.endswith('pythonw.exe') else None
229 230
230 231 if independent:
231 232 proc = Popen(arguments,
232 233 creationflags=512, # CREATE_NEW_PROCESS_GROUP
233 234 stdout=redirect, stderr=redirect, stdin=redirect)
234 235 else:
235 236 from _subprocess import DuplicateHandle, GetCurrentProcess, \
236 237 DUPLICATE_SAME_ACCESS
237 238 pid = GetCurrentProcess()
238 239 handle = DuplicateHandle(pid, pid, pid, 0,
239 240 True, # Inheritable by new processes.
240 241 DUPLICATE_SAME_ACCESS)
241 242 proc = Popen(arguments + ['--parent', str(int(handle))],
242 243 stdout=redirect, stderr=redirect, stdin=redirect)
243 244
244 245 # Attach the interrupt event to the Popen objet so it can be used later.
245 246 proc.win32_interrupt_event = interrupt_event
246 247
247 248 # Clean up pipes created to work around Popen bug.
248 249 if redirect is not None:
249 250 proc.stdout.close()
250 251 proc.stderr.close()
251 252 proc.stdin.close()
252 253
253 254 else:
254 255 if independent:
255 256 proc = Popen(arguments, preexec_fn=lambda: os.setsid())
256 257 else:
257 258 proc = Popen(arguments + ['--parent'])
258 259
259 260 return proc, xrep_port, pub_port, req_port, hb_port
@@ -1,194 +1,195 b''
1 1 #!/usr/bin/env python
2 2 """A simple interactive frontend that talks to a kernel over 0MQ.
3 3 """
4 4
5 5 #-----------------------------------------------------------------------------
6 6 # Imports
7 7 #-----------------------------------------------------------------------------
8 8 # stdlib
9 9 import cPickle as pickle
10 10 import code
11 11 import readline
12 12 import sys
13 13 import time
14 14 import uuid
15 15
16 16 # our own
17 17 import zmq
18 18 import session
19 19 import completer
20 from IPython.utils.localinterfaces import LOCALHOST
20 21
21 22 #-----------------------------------------------------------------------------
22 23 # Classes and functions
23 24 #-----------------------------------------------------------------------------
24 25
25 26 class Console(code.InteractiveConsole):
26 27
27 28 def __init__(self, locals=None, filename="<console>",
28 29 session = session,
29 30 request_socket=None,
30 31 sub_socket=None):
31 32 code.InteractiveConsole.__init__(self, locals, filename)
32 33 self.session = session
33 34 self.request_socket = request_socket
34 35 self.sub_socket = sub_socket
35 36 self.backgrounded = 0
36 37 self.messages = {}
37 38
38 39 # Set tab completion
39 40 self.completer = completer.ClientCompleter(self, session, request_socket)
40 41 readline.parse_and_bind('tab: complete')
41 42 readline.parse_and_bind('set show-all-if-ambiguous on')
42 43 readline.set_completer(self.completer.complete)
43 44
44 45 # Set system prompts
45 46 sys.ps1 = 'Py>>> '
46 47 sys.ps2 = ' ... '
47 48 sys.ps3 = 'Out : '
48 49 # Build dict of handlers for message types
49 50 self.handlers = {}
50 51 for msg_type in ['pyin', 'pyout', 'pyerr', 'stream']:
51 52 self.handlers[msg_type] = getattr(self, 'handle_%s' % msg_type)
52 53
53 54 def handle_pyin(self, omsg):
54 55 if omsg.parent_header.session == self.session.session:
55 56 return
56 57 c = omsg.content.code.rstrip()
57 58 if c:
58 59 print '[IN from %s]' % omsg.parent_header.username
59 60 print c
60 61
61 62 def handle_pyout(self, omsg):
62 63 #print omsg # dbg
63 64 if omsg.parent_header.session == self.session.session:
64 65 print "%s%s" % (sys.ps3, omsg.content.data)
65 66 else:
66 67 print '[Out from %s]' % omsg.parent_header.username
67 68 print omsg.content.data
68 69
69 70 def print_pyerr(self, err):
70 71 print >> sys.stderr, err.etype,':', err.evalue
71 72 print >> sys.stderr, ''.join(err.traceback)
72 73
73 74 def handle_pyerr(self, omsg):
74 75 if omsg.parent_header.session == self.session.session:
75 76 return
76 77 print >> sys.stderr, '[ERR from %s]' % omsg.parent_header.username
77 78 self.print_pyerr(omsg.content)
78 79
79 80 def handle_stream(self, omsg):
80 81 if omsg.content.name == 'stdout':
81 82 outstream = sys.stdout
82 83 else:
83 84 outstream = sys.stderr
84 85 print >> outstream, '*ERR*',
85 86 print >> outstream, omsg.content.data,
86 87
87 88 def handle_output(self, omsg):
88 89 handler = self.handlers.get(omsg.msg_type, None)
89 90 if handler is not None:
90 91 handler(omsg)
91 92
92 93 def recv_output(self):
93 94 while True:
94 95 omsg = self.session.recv(self.sub_socket)
95 96 if omsg is None:
96 97 break
97 98 self.handle_output(omsg)
98 99
99 100 def handle_reply(self, rep):
100 101 # Handle any side effects on output channels
101 102 self.recv_output()
102 103 # Now, dispatch on the possible reply types we must handle
103 104 if rep is None:
104 105 return
105 106 if rep.content.status == 'error':
106 107 self.print_pyerr(rep.content)
107 108 elif rep.content.status == 'aborted':
108 109 print >> sys.stderr, "ERROR: ABORTED"
109 110 ab = self.messages[rep.parent_header.msg_id].content
110 111 if 'code' in ab:
111 112 print >> sys.stderr, ab.code
112 113 else:
113 114 print >> sys.stderr, ab
114 115
115 116 def recv_reply(self):
116 117 rep = self.session.recv(self.request_socket)
117 118 self.handle_reply(rep)
118 119 return rep
119 120
120 121 def runcode(self, code):
121 122 # We can't pickle code objects, so fetch the actual source
122 123 src = '\n'.join(self.buffer)
123 124
124 125 # for non-background inputs, if we do have previoiusly backgrounded
125 126 # jobs, check to see if they've produced results
126 127 if not src.endswith(';'):
127 128 while self.backgrounded > 0:
128 129 #print 'checking background'
129 130 rep = self.recv_reply()
130 131 if rep:
131 132 self.backgrounded -= 1
132 133 time.sleep(0.05)
133 134
134 135 # Send code execution message to kernel
135 136 omsg = self.session.send(self.request_socket,
136 137 'execute_request', dict(code=src))
137 138 self.messages[omsg.header.msg_id] = omsg
138 139
139 140 # Fake asynchronicity by letting the user put ';' at the end of the line
140 141 if src.endswith(';'):
141 142 self.backgrounded += 1
142 143 return
143 144
144 145 # For foreground jobs, wait for reply
145 146 while True:
146 147 rep = self.recv_reply()
147 148 if rep is not None:
148 149 break
149 150 self.recv_output()
150 151 time.sleep(0.05)
151 152 else:
152 153 # We exited without hearing back from the kernel!
153 154 print >> sys.stderr, 'ERROR!!! kernel never got back to us!!!'
154 155
155 156
156 157 class InteractiveClient(object):
157 158 def __init__(self, session, request_socket, sub_socket):
158 159 self.session = session
159 160 self.request_socket = request_socket
160 161 self.sub_socket = sub_socket
161 162 self.console = Console(None, '<zmq-console>',
162 163 session, request_socket, sub_socket)
163 164
164 165 def interact(self):
165 166 self.console.interact()
166 167
167 168
168 169 def main():
169 170 # Defaults
170 171 #ip = '192.168.2.109'
171 ip = '127.0.0.1'
172 ip = LOCALHOST
172 173 #ip = '99.146.222.252'
173 174 port_base = 5575
174 175 connection = ('tcp://%s' % ip) + ':%i'
175 176 req_conn = connection % port_base
176 177 sub_conn = connection % (port_base+1)
177 178
178 179 # Create initial sockets
179 180 c = zmq.Context()
180 181 request_socket = c.socket(zmq.XREQ)
181 182 request_socket.connect(req_conn)
182 183
183 184 sub_socket = c.socket(zmq.SUB)
184 185 sub_socket.connect(sub_conn)
185 186 sub_socket.setsockopt(zmq.SUBSCRIBE, '')
186 187
187 188 # Make session and user-facing client
188 189 sess = session.Session()
189 190 client = InteractiveClient(sess, request_socket, sub_socket)
190 191 client.interact()
191 192
192 193
193 194 if __name__ == '__main__':
194 195 main()
@@ -1,43 +1,45 b''
1 1 """The client and server for a basic ping-pong style heartbeat.
2 2 """
3 3
4 4 #-----------------------------------------------------------------------------
5 5 # Copyright (C) 2008-2010 The IPython Development Team
6 6 #
7 7 # Distributed under the terms of the BSD License. The full license is in
8 8 # the file COPYING, distributed as part of this software.
9 9 #-----------------------------------------------------------------------------
10 10
11 11 #-----------------------------------------------------------------------------
12 12 # Imports
13 13 #-----------------------------------------------------------------------------
14 14
15 15 import sys
16 16 from threading import Thread
17 17
18 18 import zmq
19 19
20 from IPython.utils.localinterfaces import LOCALHOST
21
20 22 #-----------------------------------------------------------------------------
21 23 # Code
22 24 #-----------------------------------------------------------------------------
23 25
24 26
25 27 class Heartbeat(Thread):
26 28 "A simple ping-pong style heartbeat that runs in a thread."
27 29
28 def __init__(self, context, addr=('127.0.0.1', 0)):
30 def __init__(self, context, addr=(LOCALHOST, 0)):
29 31 Thread.__init__(self)
30 32 self.context = context
31 33 self.addr = addr
32 34 self.ip = addr[0]
33 35 self.port = addr[1]
34 36 self.daemon = True
35 37
36 38 def run(self):
37 39 self.socket = self.context.socket(zmq.REP)
38 40 if self.port == 0:
39 41 self.port = self.socket.bind_to_random_port('tcp://%s' % self.ip)
40 42 else:
41 43 self.socket.bind('tcp://%s:%i' % self.addr)
42 44 zmq.device(zmq.FORWARDER, self.socket, self.socket)
43 45
@@ -1,623 +1,630 b''
1 1 #!/usr/bin/env python
2 2 """A simple interactive kernel that talks to a frontend over 0MQ.
3 3
4 4 Things to do:
5 5
6 6 * Implement `set_parent` logic. Right before doing exec, the Kernel should
7 7 call set_parent on all the PUB objects with the message about to be executed.
8 8 * Implement random port and security key logic.
9 9 * Implement control messages.
10 10 * Implement event loop and poll version.
11 11 """
12 12
13 13 #-----------------------------------------------------------------------------
14 14 # Imports
15 15 #-----------------------------------------------------------------------------
16 16 from __future__ import print_function
17 17
18 18 # Standard library imports.
19 19 import __builtin__
20 20 import atexit
21 21 import sys
22 22 import time
23 23 import traceback
24 24
25 25 # System library imports.
26 26 import zmq
27 27
28 28 # Local imports.
29 29 from IPython.config.configurable import Configurable
30 30 from IPython.utils import io
31 31 from IPython.utils.jsonutil import json_clean
32 32 from IPython.lib import pylabtools
33 33 from IPython.utils.traitlets import Instance, Float
34 34 from entry_point import (base_launch_kernel, make_argument_parser, make_kernel,
35 35 start_kernel)
36 36 from iostream import OutStream
37 37 from session import Session, Message
38 38 from zmqshell import ZMQInteractiveShell
39 39
40 40 #-----------------------------------------------------------------------------
41 41 # Main kernel class
42 42 #-----------------------------------------------------------------------------
43 43
44 44 class Kernel(Configurable):
45 45
46 46 #---------------------------------------------------------------------------
47 47 # Kernel interface
48 48 #---------------------------------------------------------------------------
49 49
50 50 shell = Instance('IPython.core.interactiveshell.InteractiveShellABC')
51 51 session = Instance(Session)
52 52 reply_socket = Instance('zmq.Socket')
53 53 pub_socket = Instance('zmq.Socket')
54 54 req_socket = Instance('zmq.Socket')
55 55
56 56 # Private interface
57 57
58 58 # Time to sleep after flushing the stdout/err buffers in each execute
59 59 # cycle. While this introduces a hard limit on the minimal latency of the
60 60 # execute cycle, it helps prevent output synchronization problems for
61 61 # clients.
62 62 # Units are in seconds. The minimum zmq latency on local host is probably
63 63 # ~150 microseconds, set this to 500us for now. We may need to increase it
64 64 # a little if it's not enough after more interactive testing.
65 65 _execute_sleep = Float(0.0005, config=True)
66 66
67 67 # Frequency of the kernel's event loop.
68 68 # Units are in seconds, kernel subclasses for GUI toolkits may need to
69 69 # adapt to milliseconds.
70 70 _poll_interval = Float(0.05, config=True)
71 71
72 72 # If the shutdown was requested over the network, we leave here the
73 73 # necessary reply message so it can be sent by our registered atexit
74 74 # handler. This ensures that the reply is only sent to clients truly at
75 75 # the end of our shutdown process (which happens after the underlying
76 76 # IPython shell's own shutdown).
77 77 _shutdown_message = None
78 78
79 79 # This is a dict of port number that the kernel is listening on. It is set
80 80 # by record_ports and used by connect_request.
81 81 _recorded_ports = None
82 82
83 83 def __init__(self, **kwargs):
84 84 super(Kernel, self).__init__(**kwargs)
85 85
86 86 # Before we even start up the shell, register *first* our exit handlers
87 87 # so they come before the shell's
88 88 atexit.register(self._at_shutdown)
89 89
90 90 # Initialize the InteractiveShell subclass
91 91 self.shell = ZMQInteractiveShell.instance()
92 92 self.shell.displayhook.session = self.session
93 93 self.shell.displayhook.pub_socket = self.pub_socket
94 94
95 95 # TMP - hack while developing
96 96 self.shell._reply_content = None
97 97
98 98 # Build dict of handlers for message types
99 99 msg_types = [ 'execute_request', 'complete_request',
100 100 'object_info_request', 'history_request',
101 101 'connect_request', 'shutdown_request']
102 102 self.handlers = {}
103 103 for msg_type in msg_types:
104 104 self.handlers[msg_type] = getattr(self, msg_type)
105 105
106 106 def do_one_iteration(self):
107 107 """Do one iteration of the kernel's evaluation loop.
108 108 """
109 109 try:
110 110 ident = self.reply_socket.recv(zmq.NOBLOCK)
111 111 except zmq.ZMQError, e:
112 112 if e.errno == zmq.EAGAIN:
113 113 return
114 114 else:
115 115 raise
116 116 # This assert will raise in versions of zeromq 2.0.7 and lesser.
117 117 # We now require 2.0.8 or above, so we can uncomment for safety.
118 118 assert self.reply_socket.rcvmore(), "Missing message part."
119 119 msg = self.reply_socket.recv_json()
120 120
121 121 # Print some info about this message and leave a '--->' marker, so it's
122 122 # easier to trace visually the message chain when debugging. Each
123 123 # handler prints its message at the end.
124 124 # Eventually we'll move these from stdout to a logger.
125 125 io.raw_print('\n*** MESSAGE TYPE:', msg['msg_type'], '***')
126 126 io.raw_print(' Content: ', msg['content'],
127 127 '\n --->\n ', sep='', end='')
128 128
129 129 # Find and call actual handler for message
130 130 handler = self.handlers.get(msg['msg_type'], None)
131 131 if handler is None:
132 132 io.raw_print_err("UNKNOWN MESSAGE TYPE:", msg)
133 133 else:
134 134 handler(ident, msg)
135 135
136 136 # Check whether we should exit, in case the incoming message set the
137 137 # exit flag on
138 138 if self.shell.exit_now:
139 139 io.raw_print('\nExiting IPython kernel...')
140 140 # We do a normal, clean exit, which allows any actions registered
141 141 # via atexit (such as history saving) to take place.
142 142 sys.exit(0)
143 143
144 144
145 145 def start(self):
146 146 """ Start the kernel main loop.
147 147 """
148 148 while True:
149 149 time.sleep(self._poll_interval)
150 150 self.do_one_iteration()
151 151
152 152 def record_ports(self, xrep_port, pub_port, req_port, hb_port):
153 153 """Record the ports that this kernel is using.
154 154
155 155 The creator of the Kernel instance must call this methods if they
156 156 want the :meth:`connect_request` method to return the port numbers.
157 157 """
158 158 self._recorded_ports = {
159 159 'xrep_port' : xrep_port,
160 160 'pub_port' : pub_port,
161 161 'req_port' : req_port,
162 162 'hb_port' : hb_port
163 163 }
164 164
165 165 #---------------------------------------------------------------------------
166 166 # Kernel request handlers
167 167 #---------------------------------------------------------------------------
168 168
169 169 def _publish_pyin(self, code, parent):
170 170 """Publish the code request on the pyin stream."""
171 171
172 172 pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
173 173 self.pub_socket.send_json(pyin_msg)
174 174
175 175 def execute_request(self, ident, parent):
176 176
177 177 status_msg = self.session.msg(
178 178 u'status',
179 179 {u'execution_state':u'busy'},
180 180 parent=parent
181 181 )
182 182 self.pub_socket.send_json(status_msg)
183 183
184 184 try:
185 185 content = parent[u'content']
186 186 code = content[u'code']
187 187 silent = content[u'silent']
188 188 except:
189 189 io.raw_print_err("Got bad msg: ")
190 190 io.raw_print_err(Message(parent))
191 191 return
192 192
193 193 shell = self.shell # we'll need this a lot here
194 194
195 195 # Replace raw_input. Note that is not sufficient to replace
196 196 # raw_input in the user namespace.
197 197 raw_input = lambda prompt='': self._raw_input(prompt, ident, parent)
198 198 __builtin__.raw_input = raw_input
199 199
200 200 # Set the parent message of the display hook and out streams.
201 201 shell.displayhook.set_parent(parent)
202 202 sys.stdout.set_parent(parent)
203 203 sys.stderr.set_parent(parent)
204 204
205 205 # Re-broadcast our input for the benefit of listening clients, and
206 206 # start computing output
207 207 if not silent:
208 208 self._publish_pyin(code, parent)
209 209
210 210 reply_content = {}
211 211 try:
212 212 if silent:
213 213 # run_code uses 'exec' mode, so no displayhook will fire, and it
214 214 # doesn't call logging or history manipulations. Print
215 215 # statements in that code will obviously still execute.
216 216 shell.run_code(code)
217 217 else:
218 218 # FIXME: the shell calls the exception handler itself.
219 219 shell._reply_content = None
220 220 shell.run_cell(code)
221 221 except:
222 222 status = u'error'
223 223 # FIXME: this code right now isn't being used yet by default,
224 224 # because the runlines() call above directly fires off exception
225 225 # reporting. This code, therefore, is only active in the scenario
226 226 # where runlines itself has an unhandled exception. We need to
227 227 # uniformize this, for all exception construction to come from a
228 228 # single location in the codbase.
229 229 etype, evalue, tb = sys.exc_info()
230 230 tb_list = traceback.format_exception(etype, evalue, tb)
231 231 reply_content.update(shell._showtraceback(etype, evalue, tb_list))
232 232 else:
233 233 status = u'ok'
234 234
235 235 reply_content[u'status'] = status
236 236
237 237 # Return the execution counter so clients can display prompts
238 238 reply_content['execution_count'] = shell.execution_count -1
239 239
240 240 # FIXME - fish exception info out of shell, possibly left there by
241 241 # runlines. We'll need to clean up this logic later.
242 242 if shell._reply_content is not None:
243 243 reply_content.update(shell._reply_content)
244 244
245 245 # At this point, we can tell whether the main code execution succeeded
246 246 # or not. If it did, we proceed to evaluate user_variables/expressions
247 247 if reply_content['status'] == 'ok':
248 248 reply_content[u'user_variables'] = \
249 249 shell.user_variables(content[u'user_variables'])
250 250 reply_content[u'user_expressions'] = \
251 251 shell.user_expressions(content[u'user_expressions'])
252 252 else:
253 253 # If there was an error, don't even try to compute variables or
254 254 # expressions
255 255 reply_content[u'user_variables'] = {}
256 256 reply_content[u'user_expressions'] = {}
257 257
258 258 # Payloads should be retrieved regardless of outcome, so we can both
259 259 # recover partial output (that could have been generated early in a
260 260 # block, before an error) and clear the payload system always.
261 261 reply_content[u'payload'] = shell.payload_manager.read_payload()
262 262 # Be agressive about clearing the payload because we don't want
263 263 # it to sit in memory until the next execute_request comes in.
264 264 shell.payload_manager.clear_payload()
265 265
266 266 # Send the reply.
267 267 reply_msg = self.session.msg(u'execute_reply', reply_content, parent)
268 268 io.raw_print(reply_msg)
269 269
270 270 # Flush output before sending the reply.
271 271 sys.stdout.flush()
272 272 sys.stderr.flush()
273 273 # FIXME: on rare occasions, the flush doesn't seem to make it to the
274 274 # clients... This seems to mitigate the problem, but we definitely need
275 275 # to better understand what's going on.
276 276 if self._execute_sleep:
277 277 time.sleep(self._execute_sleep)
278 278
279 279 self.reply_socket.send(ident, zmq.SNDMORE)
280 280 self.reply_socket.send_json(reply_msg)
281 281 if reply_msg['content']['status'] == u'error':
282 282 self._abort_queue()
283 283
284 284 status_msg = self.session.msg(
285 285 u'status',
286 286 {u'execution_state':u'idle'},
287 287 parent=parent
288 288 )
289 289 self.pub_socket.send_json(status_msg)
290 290
291 291 def complete_request(self, ident, parent):
292 292 txt, matches = self._complete(parent)
293 293 matches = {'matches' : matches,
294 294 'matched_text' : txt,
295 295 'status' : 'ok'}
296 296 completion_msg = self.session.send(self.reply_socket, 'complete_reply',
297 297 matches, parent, ident)
298 298 io.raw_print(completion_msg)
299 299
300 300 def object_info_request(self, ident, parent):
301 301 object_info = self.shell.object_inspect(parent['content']['oname'])
302 302 # Before we send this object over, we scrub it for JSON usage
303 303 oinfo = json_clean(object_info)
304 304 msg = self.session.send(self.reply_socket, 'object_info_reply',
305 305 oinfo, parent, ident)
306 306 io.raw_print(msg)
307 307
308 308 def history_request(self, ident, parent):
309 309 output = parent['content']['output']
310 310 index = parent['content']['index']
311 311 raw = parent['content']['raw']
312 312 hist = self.shell.get_history(index=index, raw=raw, output=output)
313 313 content = {'history' : hist}
314 314 msg = self.session.send(self.reply_socket, 'history_reply',
315 315 content, parent, ident)
316 316 io.raw_print(msg)
317 317
318 318 def connect_request(self, ident, parent):
319 319 if self._recorded_ports is not None:
320 320 content = self._recorded_ports.copy()
321 321 else:
322 322 content = {}
323 323 msg = self.session.send(self.reply_socket, 'connect_reply',
324 324 content, parent, ident)
325 325 io.raw_print(msg)
326 326
327 327 def shutdown_request(self, ident, parent):
328 328 self.shell.exit_now = True
329 329 self._shutdown_message = self.session.msg(u'shutdown_reply', parent['content'], parent)
330 330 sys.exit(0)
331 331
332 332 #---------------------------------------------------------------------------
333 333 # Protected interface
334 334 #---------------------------------------------------------------------------
335 335
336 336 def _abort_queue(self):
337 337 while True:
338 338 try:
339 339 ident = self.reply_socket.recv(zmq.NOBLOCK)
340 340 except zmq.ZMQError, e:
341 341 if e.errno == zmq.EAGAIN:
342 342 break
343 343 else:
344 344 assert self.reply_socket.rcvmore(), \
345 345 "Unexpected missing message part."
346 346 msg = self.reply_socket.recv_json()
347 347 io.raw_print("Aborting:\n", Message(msg))
348 348 msg_type = msg['msg_type']
349 349 reply_type = msg_type.split('_')[0] + '_reply'
350 350 reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg)
351 351 io.raw_print(reply_msg)
352 352 self.reply_socket.send(ident,zmq.SNDMORE)
353 353 self.reply_socket.send_json(reply_msg)
354 354 # We need to wait a bit for requests to come in. This can probably
355 355 # be set shorter for true asynchronous clients.
356 356 time.sleep(0.1)
357 357
358 358 def _raw_input(self, prompt, ident, parent):
359 359 # Flush output before making the request.
360 360 sys.stderr.flush()
361 361 sys.stdout.flush()
362 362
363 363 # Send the input request.
364 364 content = dict(prompt=prompt)
365 365 msg = self.session.msg(u'input_request', content, parent)
366 366 self.req_socket.send_json(msg)
367 367
368 368 # Await a response.
369 369 reply = self.req_socket.recv_json()
370 370 try:
371 371 value = reply['content']['value']
372 372 except:
373 373 io.raw_print_err("Got bad raw_input reply: ")
374 374 io.raw_print_err(Message(parent))
375 375 value = ''
376 376 return value
377 377
378 378 def _complete(self, msg):
379 379 c = msg['content']
380 380 try:
381 381 cpos = int(c['cursor_pos'])
382 382 except:
383 383 # If we don't get something that we can convert to an integer, at
384 384 # least attempt the completion guessing the cursor is at the end of
385 385 # the text, if there's any, and otherwise of the line
386 386 cpos = len(c['text'])
387 387 if cpos==0:
388 388 cpos = len(c['line'])
389 389 return self.shell.complete(c['text'], c['line'], cpos)
390 390
391 391 def _object_info(self, context):
392 392 symbol, leftover = self._symbol_from_context(context)
393 393 if symbol is not None and not leftover:
394 394 doc = getattr(symbol, '__doc__', '')
395 395 else:
396 396 doc = ''
397 397 object_info = dict(docstring = doc)
398 398 return object_info
399 399
400 400 def _symbol_from_context(self, context):
401 401 if not context:
402 402 return None, context
403 403
404 404 base_symbol_string = context[0]
405 405 symbol = self.shell.user_ns.get(base_symbol_string, None)
406 406 if symbol is None:
407 407 symbol = __builtin__.__dict__.get(base_symbol_string, None)
408 408 if symbol is None:
409 409 return None, context
410 410
411 411 context = context[1:]
412 412 for i, name in enumerate(context):
413 413 new_symbol = getattr(symbol, name, None)
414 414 if new_symbol is None:
415 415 return symbol, context[i:]
416 416 else:
417 417 symbol = new_symbol
418 418
419 419 return symbol, []
420 420
421 421 def _at_shutdown(self):
422 422 """Actions taken at shutdown by the kernel, called by python's atexit.
423 423 """
424 424 # io.rprint("Kernel at_shutdown") # dbg
425 425 if self._shutdown_message is not None:
426 426 self.reply_socket.send_json(self._shutdown_message)
427 427 self.pub_socket.send_json(self._shutdown_message)
428 428 io.raw_print(self._shutdown_message)
429 429 # A very short sleep to give zmq time to flush its message buffers
430 430 # before Python truly shuts down.
431 431 time.sleep(0.01)
432 432
433 433
434 434 class QtKernel(Kernel):
435 435 """A Kernel subclass with Qt support."""
436 436
437 437 def start(self):
438 438 """Start a kernel with QtPy4 event loop integration."""
439 439
440 440 from PyQt4 import QtCore
441 441 from IPython.lib.guisupport import get_app_qt4, start_event_loop_qt4
442 442
443 443 self.app = get_app_qt4([" "])
444 444 self.app.setQuitOnLastWindowClosed(False)
445 445 self.timer = QtCore.QTimer()
446 446 self.timer.timeout.connect(self.do_one_iteration)
447 447 # Units for the timer are in milliseconds
448 448 self.timer.start(1000*self._poll_interval)
449 449 start_event_loop_qt4(self.app)
450 450
451 451
452 452 class WxKernel(Kernel):
453 453 """A Kernel subclass with Wx support."""
454 454
455 455 def start(self):
456 456 """Start a kernel with wx event loop support."""
457 457
458 458 import wx
459 459 from IPython.lib.guisupport import start_event_loop_wx
460 460
461 461 doi = self.do_one_iteration
462 462 # Wx uses milliseconds
463 463 poll_interval = int(1000*self._poll_interval)
464 464
465 465 # We have to put the wx.Timer in a wx.Frame for it to fire properly.
466 466 # We make the Frame hidden when we create it in the main app below.
467 467 class TimerFrame(wx.Frame):
468 468 def __init__(self, func):
469 469 wx.Frame.__init__(self, None, -1)
470 470 self.timer = wx.Timer(self)
471 471 # Units for the timer are in milliseconds
472 472 self.timer.Start(poll_interval)
473 473 self.Bind(wx.EVT_TIMER, self.on_timer)
474 474 self.func = func
475 475
476 476 def on_timer(self, event):
477 477 self.func()
478 478
479 479 # We need a custom wx.App to create our Frame subclass that has the
480 480 # wx.Timer to drive the ZMQ event loop.
481 481 class IPWxApp(wx.App):
482 482 def OnInit(self):
483 483 self.frame = TimerFrame(doi)
484 484 self.frame.Show(False)
485 485 return True
486 486
487 487 # The redirect=False here makes sure that wx doesn't replace
488 488 # sys.stdout/stderr with its own classes.
489 489 self.app = IPWxApp(redirect=False)
490 490 start_event_loop_wx(self.app)
491 491
492 492
493 493 class TkKernel(Kernel):
494 494 """A Kernel subclass with Tk support."""
495 495
496 496 def start(self):
497 497 """Start a Tk enabled event loop."""
498 498
499 499 import Tkinter
500 500 doi = self.do_one_iteration
501 501 # Tk uses milliseconds
502 502 poll_interval = int(1000*self._poll_interval)
503 503 # For Tkinter, we create a Tk object and call its withdraw method.
504 504 class Timer(object):
505 505 def __init__(self, func):
506 506 self.app = Tkinter.Tk()
507 507 self.app.withdraw()
508 508 self.func = func
509 509
510 510 def on_timer(self):
511 511 self.func()
512 512 self.app.after(poll_interval, self.on_timer)
513 513
514 514 def start(self):
515 515 self.on_timer() # Call it once to get things going.
516 516 self.app.mainloop()
517 517
518 518 self.timer = Timer(doi)
519 519 self.timer.start()
520 520
521 521
522 522 class GTKKernel(Kernel):
523 523 """A Kernel subclass with GTK support."""
524 524
525 525 def start(self):
526 526 """Start the kernel, coordinating with the GTK event loop"""
527 527 from .gui.gtkembed import GTKEmbed
528 528
529 529 gtk_kernel = GTKEmbed(self)
530 530 gtk_kernel.start()
531 531
532 532
533 533 #-----------------------------------------------------------------------------
534 534 # Kernel main and launch functions
535 535 #-----------------------------------------------------------------------------
536 536
537 def launch_kernel(xrep_port=0, pub_port=0, req_port=0, hb_port=0,
537 def launch_kernel(ip=None, xrep_port=0, pub_port=0, req_port=0, hb_port=0,
538 538 independent=False, pylab=False):
539 539 """Launches a localhost kernel, binding to the specified ports.
540 540
541 541 Parameters
542 542 ----------
543 ip : str, optional
544 The ip address the kernel will bind to.
545
543 546 xrep_port : int, optional
544 547 The port to use for XREP channel.
545 548
546 549 pub_port : int, optional
547 550 The port to use for the SUB channel.
548 551
549 552 req_port : int, optional
550 553 The port to use for the REQ (raw input) channel.
551 554
552 555 hb_port : int, optional
553 556 The port to use for the hearbeat REP channel.
554 557
555 558 independent : bool, optional (default False)
556 559 If set, the kernel process is guaranteed to survive if this process
557 560 dies. If not set, an effort is made to ensure that the kernel is killed
558 561 when this process dies. Note that in this case it is still good practice
559 562 to kill kernels manually before exiting.
560 563
561 564 pylab : bool or string, optional (default False)
562 565 If not False, the kernel will be launched with pylab enabled. If a
563 566 string is passed, matplotlib will use the specified backend. Otherwise,
564 567 matplotlib's default backend will be used.
565 568
566 569 Returns
567 570 -------
568 571 A tuple of form:
569 572 (kernel_process, xrep_port, pub_port, req_port)
570 573 where kernel_process is a Popen object and the ports are integers.
571 574 """
572 575 extra_arguments = []
573 576 if pylab:
574 577 extra_arguments.append('--pylab')
575 578 if isinstance(pylab, basestring):
576 579 extra_arguments.append(pylab)
580 if ip is not None:
581 extra_arguments.append('--ip')
582 if isinstance(ip, basestring):
583 extra_arguments.append(ip)
577 584 return base_launch_kernel('from IPython.zmq.ipkernel import main; main()',
578 585 xrep_port, pub_port, req_port, hb_port,
579 586 independent, extra_arguments)
580 587
581 588
582 589 def main():
583 590 """ The IPython kernel main entry point.
584 591 """
585 592 parser = make_argument_parser()
586 593 parser.add_argument('--pylab', type=str, metavar='GUI', nargs='?',
587 594 const='auto', help = \
588 595 "Pre-load matplotlib and numpy for interactive use. If GUI is not \
589 596 given, the GUI backend is matplotlib's, otherwise use one of: \
590 597 ['tk', 'gtk', 'qt', 'wx', 'inline'].")
591 598 namespace = parser.parse_args()
592 599
593 600 kernel_class = Kernel
594 601
595 602 kernel_classes = {
596 603 'qt' : QtKernel,
597 604 'qt4': QtKernel,
598 605 'inline': Kernel,
599 606 'wx' : WxKernel,
600 607 'tk' : TkKernel,
601 608 'gtk': GTKKernel,
602 609 }
603 610 if namespace.pylab:
604 611 if namespace.pylab == 'auto':
605 612 gui, backend = pylabtools.find_gui_and_backend()
606 613 else:
607 614 gui, backend = pylabtools.find_gui_and_backend(namespace.pylab)
608 615 kernel_class = kernel_classes.get(gui)
609 616 if kernel_class is None:
610 617 raise ValueError('GUI is not supported: %r' % gui)
611 618 pylabtools.activate_matplotlib(backend)
612 619
613 620 kernel = make_kernel(namespace, kernel_class, OutStream)
614 621
615 622 if namespace.pylab:
616 623 pylabtools.import_pylab(kernel.shell.user_ns, backend,
617 624 shell=kernel.shell)
618 625
619 626 start_kernel(namespace, kernel)
620 627
621 628
622 629 if __name__ == '__main__':
623 630 main()
@@ -1,905 +1,906 b''
1 1 """Base classes to manage the interaction with a running kernel.
2 2
3 3 TODO
4 4 * Create logger to handle debugging and console messages.
5 5 """
6 6
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2008-2010 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 # Standard library imports.
19 19 import atexit
20 20 from Queue import Queue, Empty
21 21 from subprocess import Popen
22 22 import signal
23 23 import sys
24 24 from threading import Thread
25 25 import time
26 26
27 27 # System library imports.
28 28 import zmq
29 29 from zmq import POLLIN, POLLOUT, POLLERR
30 30 from zmq.eventloop import ioloop
31 31
32 32 # Local imports.
33 33 from IPython.utils import io
34 from IPython.utils.localinterfaces import LOCALHOST, LOCAL_IPS
34 35 from IPython.utils.traitlets import HasTraits, Any, Instance, Type, TCPAddress
35 36 from session import Session
36 37
37 38 #-----------------------------------------------------------------------------
38 39 # Constants and exceptions
39 40 #-----------------------------------------------------------------------------
40 41
41 LOCALHOST = '127.0.0.1'
42
43 42 class InvalidPortNumber(Exception):
44 43 pass
45 44
46 45 #-----------------------------------------------------------------------------
47 46 # Utility functions
48 47 #-----------------------------------------------------------------------------
49 48
50 49 # some utilities to validate message structure, these might get moved elsewhere
51 50 # if they prove to have more generic utility
52 51
53 52 def validate_string_list(lst):
54 53 """Validate that the input is a list of strings.
55 54
56 55 Raises ValueError if not."""
57 56 if not isinstance(lst, list):
58 57 raise ValueError('input %r must be a list' % lst)
59 58 for x in lst:
60 59 if not isinstance(x, basestring):
61 60 raise ValueError('element %r in list must be a string' % x)
62 61
63 62
64 63 def validate_string_dict(dct):
65 64 """Validate that the input is a dict with string keys and values.
66 65
67 66 Raises ValueError if not."""
68 67 for k,v in dct.iteritems():
69 68 if not isinstance(k, basestring):
70 69 raise ValueError('key %r in dict must be a string' % k)
71 70 if not isinstance(v, basestring):
72 71 raise ValueError('value %r in dict must be a string' % v)
73 72
74 73
75 74 #-----------------------------------------------------------------------------
76 75 # ZMQ Socket Channel classes
77 76 #-----------------------------------------------------------------------------
78 77
79 78 class ZmqSocketChannel(Thread):
80 79 """The base class for the channels that use ZMQ sockets.
81 80 """
82 81 context = None
83 82 session = None
84 83 socket = None
85 84 ioloop = None
86 85 iostate = None
87 86 _address = None
88 87
89 88 def __init__(self, context, session, address):
90 89 """Create a channel
91 90
92 91 Parameters
93 92 ----------
94 93 context : :class:`zmq.Context`
95 94 The ZMQ context to use.
96 95 session : :class:`session.Session`
97 96 The session to use.
98 97 address : tuple
99 98 Standard (ip, port) tuple that the kernel is listening on.
100 99 """
101 100 super(ZmqSocketChannel, self).__init__()
102 101 self.daemon = True
103 102
104 103 self.context = context
105 104 self.session = session
106 105 if address[1] == 0:
107 106 message = 'The port number for a channel cannot be 0.'
108 107 raise InvalidPortNumber(message)
109 108 self._address = address
110 109
111 110 def stop(self):
112 111 """Stop the channel's activity.
113 112
114 113 This calls :method:`Thread.join` and returns when the thread
115 114 terminates. :class:`RuntimeError` will be raised if
116 115 :method:`self.start` is called again.
117 116 """
118 117 self.join()
119 118
120 119 @property
121 120 def address(self):
122 121 """Get the channel's address as an (ip, port) tuple.
123 122
124 123 By the default, the address is (localhost, 0), where 0 means a random
125 124 port.
126 125 """
127 126 return self._address
128 127
129 128 def add_io_state(self, state):
130 129 """Add IO state to the eventloop.
131 130
132 131 Parameters
133 132 ----------
134 133 state : zmq.POLLIN|zmq.POLLOUT|zmq.POLLERR
135 134 The IO state flag to set.
136 135
137 136 This is thread safe as it uses the thread safe IOLoop.add_callback.
138 137 """
139 138 def add_io_state_callback():
140 139 if not self.iostate & state:
141 140 self.iostate = self.iostate | state
142 141 self.ioloop.update_handler(self.socket, self.iostate)
143 142 self.ioloop.add_callback(add_io_state_callback)
144 143
145 144 def drop_io_state(self, state):
146 145 """Drop IO state from the eventloop.
147 146
148 147 Parameters
149 148 ----------
150 149 state : zmq.POLLIN|zmq.POLLOUT|zmq.POLLERR
151 150 The IO state flag to set.
152 151
153 152 This is thread safe as it uses the thread safe IOLoop.add_callback.
154 153 """
155 154 def drop_io_state_callback():
156 155 if self.iostate & state:
157 156 self.iostate = self.iostate & (~state)
158 157 self.ioloop.update_handler(self.socket, self.iostate)
159 158 self.ioloop.add_callback(drop_io_state_callback)
160 159
161 160
162 161 class XReqSocketChannel(ZmqSocketChannel):
163 162 """The XREQ channel for issues request/replies to the kernel.
164 163 """
165 164
166 165 command_queue = None
167 166
168 167 def __init__(self, context, session, address):
169 168 super(XReqSocketChannel, self).__init__(context, session, address)
170 169 self.command_queue = Queue()
171 170 self.ioloop = ioloop.IOLoop()
172 171
173 172 def run(self):
174 173 """The thread's main activity. Call start() instead."""
175 174 self.socket = self.context.socket(zmq.XREQ)
176 175 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
177 176 self.socket.connect('tcp://%s:%i' % self.address)
178 177 self.iostate = POLLERR|POLLIN
179 178 self.ioloop.add_handler(self.socket, self._handle_events,
180 179 self.iostate)
181 180 self.ioloop.start()
182 181
183 182 def stop(self):
184 183 self.ioloop.stop()
185 184 super(XReqSocketChannel, self).stop()
186 185
187 186 def call_handlers(self, msg):
188 187 """This method is called in the ioloop thread when a message arrives.
189 188
190 189 Subclasses should override this method to handle incoming messages.
191 190 It is important to remember that this method is called in the thread
192 191 so that some logic must be done to ensure that the application leve
193 192 handlers are called in the application thread.
194 193 """
195 194 raise NotImplementedError('call_handlers must be defined in a subclass.')
196 195
197 196 def execute(self, code, silent=False,
198 197 user_variables=None, user_expressions=None):
199 198 """Execute code in the kernel.
200 199
201 200 Parameters
202 201 ----------
203 202 code : str
204 203 A string of Python code.
205 204
206 205 silent : bool, optional (default False)
207 206 If set, the kernel will execute the code as quietly possible.
208 207
209 208 user_variables : list, optional
210 209 A list of variable names to pull from the user's namespace. They
211 210 will come back as a dict with these names as keys and their
212 211 :func:`repr` as values.
213 212
214 213 user_expressions : dict, optional
215 214 A dict with string keys and to pull from the user's
216 215 namespace. They will come back as a dict with these names as keys
217 216 and their :func:`repr` as values.
218 217
219 218 Returns
220 219 -------
221 220 The msg_id of the message sent.
222 221 """
223 222 if user_variables is None:
224 223 user_variables = []
225 224 if user_expressions is None:
226 225 user_expressions = {}
227 226
228 227 # Don't waste network traffic if inputs are invalid
229 228 if not isinstance(code, basestring):
230 229 raise ValueError('code %r must be a string' % code)
231 230 validate_string_list(user_variables)
232 231 validate_string_dict(user_expressions)
233 232
234 233 # Create class for content/msg creation. Related to, but possibly
235 234 # not in Session.
236 235 content = dict(code=code, silent=silent,
237 236 user_variables=user_variables,
238 237 user_expressions=user_expressions)
239 238 msg = self.session.msg('execute_request', content)
240 239 self._queue_request(msg)
241 240 return msg['header']['msg_id']
242 241
243 242 def complete(self, text, line, cursor_pos, block=None):
244 243 """Tab complete text in the kernel's namespace.
245 244
246 245 Parameters
247 246 ----------
248 247 text : str
249 248 The text to complete.
250 249 line : str
251 250 The full line of text that is the surrounding context for the
252 251 text to complete.
253 252 cursor_pos : int
254 253 The position of the cursor in the line where the completion was
255 254 requested.
256 255 block : str, optional
257 256 The full block of code in which the completion is being requested.
258 257
259 258 Returns
260 259 -------
261 260 The msg_id of the message sent.
262 261 """
263 262 content = dict(text=text, line=line, block=block, cursor_pos=cursor_pos)
264 263 msg = self.session.msg('complete_request', content)
265 264 self._queue_request(msg)
266 265 return msg['header']['msg_id']
267 266
268 267 def object_info(self, oname):
269 268 """Get metadata information about an object.
270 269
271 270 Parameters
272 271 ----------
273 272 oname : str
274 273 A string specifying the object name.
275 274
276 275 Returns
277 276 -------
278 277 The msg_id of the message sent.
279 278 """
280 279 content = dict(oname=oname)
281 280 msg = self.session.msg('object_info_request', content)
282 281 self._queue_request(msg)
283 282 return msg['header']['msg_id']
284 283
285 284 def history(self, index=None, raw=False, output=True):
286 285 """Get the history list.
287 286
288 287 Parameters
289 288 ----------
290 289 index : n or (n1, n2) or None
291 290 If n, then the last entries. If a tuple, then all in
292 291 range(n1, n2). If None, then all entries. Raises IndexError if
293 292 the format of index is incorrect.
294 293 raw : bool
295 294 If True, return the raw input.
296 295 output : bool
297 296 If True, then return the output as well.
298 297
299 298 Returns
300 299 -------
301 300 The msg_id of the message sent.
302 301 """
303 302 content = dict(index=index, raw=raw, output=output)
304 303 msg = self.session.msg('history_request', content)
305 304 self._queue_request(msg)
306 305 return msg['header']['msg_id']
307 306
308 307 def shutdown(self, restart=False):
309 308 """Request an immediate kernel shutdown.
310 309
311 310 Upon receipt of the (empty) reply, client code can safely assume that
312 311 the kernel has shut down and it's safe to forcefully terminate it if
313 312 it's still alive.
314 313
315 314 The kernel will send the reply via a function registered with Python's
316 315 atexit module, ensuring it's truly done as the kernel is done with all
317 316 normal operation.
318 317 """
319 318 # Send quit message to kernel. Once we implement kernel-side setattr,
320 319 # this should probably be done that way, but for now this will do.
321 320 msg = self.session.msg('shutdown_request', {'restart':restart})
322 321 self._queue_request(msg)
323 322 return msg['header']['msg_id']
324 323
325 324 def _handle_events(self, socket, events):
326 325 if events & POLLERR:
327 326 self._handle_err()
328 327 if events & POLLOUT:
329 328 self._handle_send()
330 329 if events & POLLIN:
331 330 self._handle_recv()
332 331
333 332 def _handle_recv(self):
334 333 msg = self.socket.recv_json()
335 334 self.call_handlers(msg)
336 335
337 336 def _handle_send(self):
338 337 try:
339 338 msg = self.command_queue.get(False)
340 339 except Empty:
341 340 pass
342 341 else:
343 342 self.socket.send_json(msg)
344 343 if self.command_queue.empty():
345 344 self.drop_io_state(POLLOUT)
346 345
347 346 def _handle_err(self):
348 347 # We don't want to let this go silently, so eventually we should log.
349 348 raise zmq.ZMQError()
350 349
351 350 def _queue_request(self, msg):
352 351 self.command_queue.put(msg)
353 352 self.add_io_state(POLLOUT)
354 353
355 354
356 355 class SubSocketChannel(ZmqSocketChannel):
357 356 """The SUB channel which listens for messages that the kernel publishes.
358 357 """
359 358
360 359 def __init__(self, context, session, address):
361 360 super(SubSocketChannel, self).__init__(context, session, address)
362 361 self.ioloop = ioloop.IOLoop()
363 362
364 363 def run(self):
365 364 """The thread's main activity. Call start() instead."""
366 365 self.socket = self.context.socket(zmq.SUB)
367 366 self.socket.setsockopt(zmq.SUBSCRIBE,'')
368 367 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
369 368 self.socket.connect('tcp://%s:%i' % self.address)
370 369 self.iostate = POLLIN|POLLERR
371 370 self.ioloop.add_handler(self.socket, self._handle_events,
372 371 self.iostate)
373 372 self.ioloop.start()
374 373
375 374 def stop(self):
376 375 self.ioloop.stop()
377 376 super(SubSocketChannel, self).stop()
378 377
379 378 def call_handlers(self, msg):
380 379 """This method is called in the ioloop thread when a message arrives.
381 380
382 381 Subclasses should override this method to handle incoming messages.
383 382 It is important to remember that this method is called in the thread
384 383 so that some logic must be done to ensure that the application leve
385 384 handlers are called in the application thread.
386 385 """
387 386 raise NotImplementedError('call_handlers must be defined in a subclass.')
388 387
389 388 def flush(self, timeout=1.0):
390 389 """Immediately processes all pending messages on the SUB channel.
391 390
392 391 Callers should use this method to ensure that :method:`call_handlers`
393 392 has been called for all messages that have been received on the
394 393 0MQ SUB socket of this channel.
395 394
396 395 This method is thread safe.
397 396
398 397 Parameters
399 398 ----------
400 399 timeout : float, optional
401 400 The maximum amount of time to spend flushing, in seconds. The
402 401 default is one second.
403 402 """
404 403 # We do the IOLoop callback process twice to ensure that the IOLoop
405 404 # gets to perform at least one full poll.
406 405 stop_time = time.time() + timeout
407 406 for i in xrange(2):
408 407 self._flushed = False
409 408 self.ioloop.add_callback(self._flush)
410 409 while not self._flushed and time.time() < stop_time:
411 410 time.sleep(0.01)
412 411
413 412 def _handle_events(self, socket, events):
414 413 # Turn on and off POLLOUT depending on if we have made a request
415 414 if events & POLLERR:
416 415 self._handle_err()
417 416 if events & POLLIN:
418 417 self._handle_recv()
419 418
420 419 def _handle_err(self):
421 420 # We don't want to let this go silently, so eventually we should log.
422 421 raise zmq.ZMQError()
423 422
424 423 def _handle_recv(self):
425 424 # Get all of the messages we can
426 425 while True:
427 426 try:
428 427 msg = self.socket.recv_json(zmq.NOBLOCK)
429 428 except zmq.ZMQError:
430 429 # Check the errno?
431 430 # Will this trigger POLLERR?
432 431 break
433 432 else:
434 433 self.call_handlers(msg)
435 434
436 435 def _flush(self):
437 436 """Callback for :method:`self.flush`."""
438 437 self._flushed = True
439 438
440 439
441 440 class RepSocketChannel(ZmqSocketChannel):
442 441 """A reply channel to handle raw_input requests that the kernel makes."""
443 442
444 443 msg_queue = None
445 444
446 445 def __init__(self, context, session, address):
447 446 super(RepSocketChannel, self).__init__(context, session, address)
448 447 self.ioloop = ioloop.IOLoop()
449 448 self.msg_queue = Queue()
450 449
451 450 def run(self):
452 451 """The thread's main activity. Call start() instead."""
453 452 self.socket = self.context.socket(zmq.XREQ)
454 453 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
455 454 self.socket.connect('tcp://%s:%i' % self.address)
456 455 self.iostate = POLLERR|POLLIN
457 456 self.ioloop.add_handler(self.socket, self._handle_events,
458 457 self.iostate)
459 458 self.ioloop.start()
460 459
461 460 def stop(self):
462 461 self.ioloop.stop()
463 462 super(RepSocketChannel, self).stop()
464 463
465 464 def call_handlers(self, msg):
466 465 """This method is called in the ioloop thread when a message arrives.
467 466
468 467 Subclasses should override this method to handle incoming messages.
469 468 It is important to remember that this method is called in the thread
470 469 so that some logic must be done to ensure that the application leve
471 470 handlers are called in the application thread.
472 471 """
473 472 raise NotImplementedError('call_handlers must be defined in a subclass.')
474 473
475 474 def input(self, string):
476 475 """Send a string of raw input to the kernel."""
477 476 content = dict(value=string)
478 477 msg = self.session.msg('input_reply', content)
479 478 self._queue_reply(msg)
480 479
481 480 def _handle_events(self, socket, events):
482 481 if events & POLLERR:
483 482 self._handle_err()
484 483 if events & POLLOUT:
485 484 self._handle_send()
486 485 if events & POLLIN:
487 486 self._handle_recv()
488 487
489 488 def _handle_recv(self):
490 489 msg = self.socket.recv_json()
491 490 self.call_handlers(msg)
492 491
493 492 def _handle_send(self):
494 493 try:
495 494 msg = self.msg_queue.get(False)
496 495 except Empty:
497 496 pass
498 497 else:
499 498 self.socket.send_json(msg)
500 499 if self.msg_queue.empty():
501 500 self.drop_io_state(POLLOUT)
502 501
503 502 def _handle_err(self):
504 503 # We don't want to let this go silently, so eventually we should log.
505 504 raise zmq.ZMQError()
506 505
507 506 def _queue_reply(self, msg):
508 507 self.msg_queue.put(msg)
509 508 self.add_io_state(POLLOUT)
510 509
511 510
512 511 class HBSocketChannel(ZmqSocketChannel):
513 512 """The heartbeat channel which monitors the kernel heartbeat.
514 513
515 514 Note that the heartbeat channel is paused by default. As long as you start
516 515 this channel, the kernel manager will ensure that it is paused and un-paused
517 516 as appropriate.
518 517 """
519 518
520 519 time_to_dead = 3.0
521 520 socket = None
522 521 poller = None
523 522 _running = None
524 523 _pause = None
525 524
526 525 def __init__(self, context, session, address):
527 526 super(HBSocketChannel, self).__init__(context, session, address)
528 527 self._running = False
529 528 self._pause = True
530 529
531 530 def _create_socket(self):
532 531 self.socket = self.context.socket(zmq.REQ)
533 532 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
534 533 self.socket.connect('tcp://%s:%i' % self.address)
535 534 self.poller = zmq.Poller()
536 535 self.poller.register(self.socket, zmq.POLLIN)
537 536
538 537 def run(self):
539 538 """The thread's main activity. Call start() instead."""
540 539 self._create_socket()
541 540 self._running = True
542 541 while self._running:
543 542 if self._pause:
544 543 time.sleep(self.time_to_dead)
545 544 else:
546 545 since_last_heartbeat = 0.0
547 546 request_time = time.time()
548 547 try:
549 548 #io.rprint('Ping from HB channel') # dbg
550 549 self.socket.send_json('ping')
551 550 except zmq.ZMQError, e:
552 551 #io.rprint('*** HB Error:', e) # dbg
553 552 if e.errno == zmq.EFSM:
554 553 #io.rprint('sleep...', self.time_to_dead) # dbg
555 554 time.sleep(self.time_to_dead)
556 555 self._create_socket()
557 556 else:
558 557 raise
559 558 else:
560 559 while True:
561 560 try:
562 561 self.socket.recv_json(zmq.NOBLOCK)
563 562 except zmq.ZMQError, e:
564 563 #io.rprint('*** HB Error 2:', e) # dbg
565 564 if e.errno == zmq.EAGAIN:
566 565 before_poll = time.time()
567 566 until_dead = self.time_to_dead - (before_poll -
568 567 request_time)
569 568
570 569 # When the return value of poll() is an empty
571 570 # list, that is when things have gone wrong
572 571 # (zeromq bug). As long as it is not an empty
573 572 # list, poll is working correctly even if it
574 573 # returns quickly. Note: poll timeout is in
575 574 # milliseconds.
576 575 self.poller.poll(1000*until_dead)
577 576
578 577 since_last_heartbeat = time.time()-request_time
579 578 if since_last_heartbeat > self.time_to_dead:
580 579 self.call_handlers(since_last_heartbeat)
581 580 break
582 581 else:
583 582 # FIXME: We should probably log this instead.
584 583 raise
585 584 else:
586 585 until_dead = self.time_to_dead - (time.time() -
587 586 request_time)
588 587 if until_dead > 0.0:
589 588 #io.rprint('sleep...', self.time_to_dead) # dbg
590 589 time.sleep(until_dead)
591 590 break
592 591
593 592 def pause(self):
594 593 """Pause the heartbeat."""
595 594 self._pause = True
596 595
597 596 def unpause(self):
598 597 """Unpause the heartbeat."""
599 598 self._pause = False
600 599
601 600 def is_beating(self):
602 601 """Is the heartbeat running and not paused."""
603 602 if self.is_alive() and not self._pause:
604 603 return True
605 604 else:
606 605 return False
607 606
608 607 def stop(self):
609 608 self._running = False
610 609 super(HBSocketChannel, self).stop()
611 610
612 611 def call_handlers(self, since_last_heartbeat):
613 612 """This method is called in the ioloop thread when a message arrives.
614 613
615 614 Subclasses should override this method to handle incoming messages.
616 615 It is important to remember that this method is called in the thread
617 616 so that some logic must be done to ensure that the application leve
618 617 handlers are called in the application thread.
619 618 """
620 619 raise NotImplementedError('call_handlers must be defined in a subclass.')
621 620
622 621
623 622 #-----------------------------------------------------------------------------
624 623 # Main kernel manager class
625 624 #-----------------------------------------------------------------------------
626 625
627 626 class KernelManager(HasTraits):
628 627 """ Manages a kernel for a frontend.
629 628
630 629 The SUB channel is for the frontend to receive messages published by the
631 630 kernel.
632 631
633 632 The REQ channel is for the frontend to make requests of the kernel.
634 633
635 634 The REP channel is for the kernel to request stdin (raw_input) from the
636 635 frontend.
637 636 """
638 637 # The PyZMQ Context to use for communication with the kernel.
639 638 context = Instance(zmq.Context,(),{})
640 639
641 640 # The Session to use for communication with the kernel.
642 641 session = Instance(Session,(),{})
643 642
644 643 # The kernel process with which the KernelManager is communicating.
645 644 kernel = Instance(Popen)
646 645
647 646 # The addresses for the communication channels.
648 647 xreq_address = TCPAddress((LOCALHOST, 0))
649 648 sub_address = TCPAddress((LOCALHOST, 0))
650 649 rep_address = TCPAddress((LOCALHOST, 0))
651 650 hb_address = TCPAddress((LOCALHOST, 0))
652 651
653 652 # The classes to use for the various channels.
654 653 xreq_channel_class = Type(XReqSocketChannel)
655 654 sub_channel_class = Type(SubSocketChannel)
656 655 rep_channel_class = Type(RepSocketChannel)
657 656 hb_channel_class = Type(HBSocketChannel)
658 657
659 658 # Protected traits.
660 659 _launch_args = Any
661 660 _xreq_channel = Any
662 661 _sub_channel = Any
663 662 _rep_channel = Any
664 663 _hb_channel = Any
665 664
666 665 def __init__(self, **kwargs):
667 666 super(KernelManager, self).__init__(**kwargs)
668 667 # Uncomment this to try closing the context.
669 668 # atexit.register(self.context.close)
670 669
671 670 #--------------------------------------------------------------------------
672 671 # Channel management methods:
673 672 #--------------------------------------------------------------------------
674 673
675 674 def start_channels(self, xreq=True, sub=True, rep=True, hb=True):
676 675 """Starts the channels for this kernel.
677 676
678 677 This will create the channels if they do not exist and then start
679 678 them. If port numbers of 0 are being used (random ports) then you
680 679 must first call :method:`start_kernel`. If the channels have been
681 680 stopped and you call this, :class:`RuntimeError` will be raised.
682 681 """
683 682 if xreq:
684 683 self.xreq_channel.start()
685 684 if sub:
686 685 self.sub_channel.start()
687 686 if rep:
688 687 self.rep_channel.start()
689 688 if hb:
690 689 self.hb_channel.start()
691 690
692 691 def stop_channels(self):
693 692 """Stops all the running channels for this kernel.
694 693 """
695 694 if self.xreq_channel.is_alive():
696 695 self.xreq_channel.stop()
697 696 if self.sub_channel.is_alive():
698 697 self.sub_channel.stop()
699 698 if self.rep_channel.is_alive():
700 699 self.rep_channel.stop()
701 700 if self.hb_channel.is_alive():
702 701 self.hb_channel.stop()
703 702
704 703 @property
705 704 def channels_running(self):
706 705 """Are any of the channels created and running?"""
707 706 return (self.xreq_channel.is_alive() or self.sub_channel.is_alive() or
708 707 self.rep_channel.is_alive() or self.hb_channel.is_alive())
709 708
710 709 #--------------------------------------------------------------------------
711 710 # Kernel process management methods:
712 711 #--------------------------------------------------------------------------
713 712
714 713 def start_kernel(self, **kw):
715 714 """Starts a kernel process and configures the manager to use it.
716 715
717 716 If random ports (port=0) are being used, this method must be called
718 717 before the channels are created.
719 718
720 719 Parameters:
721 720 -----------
722 721 ipython : bool, optional (default True)
723 722 Whether to use an IPython kernel instead of a plain Python kernel.
724 723 """
725 724 xreq, sub, rep, hb = self.xreq_address, self.sub_address, \
726 725 self.rep_address, self.hb_address
727 if xreq[0] != LOCALHOST or sub[0] != LOCALHOST or \
728 rep[0] != LOCALHOST or hb[0] != LOCALHOST:
729 raise RuntimeError("Can only launch a kernel on localhost."
726 if xreq[0] not in LOCAL_IPS or sub[0] not in LOCAL_IPS or \
727 rep[0] not in LOCAL_IPS or hb[0] not in LOCAL_IPS:
728 raise RuntimeError("Can only launch a kernel on a local interface. "
730 729 "Make sure that the '*_address' attributes are "
731 "configured properly.")
732
730 "configured properly. "
731 "Currently valid addresses are: %s"%LOCAL_IPS
732 )
733
733 734 self._launch_args = kw.copy()
734 735 if kw.pop('ipython', True):
735 736 from ipkernel import launch_kernel
736 737 else:
737 738 from pykernel import launch_kernel
738 self.kernel, xrep, pub, req, hb = launch_kernel(
739 self.kernel, xrep, pub, req, _hb = launch_kernel(
739 740 xrep_port=xreq[1], pub_port=sub[1],
740 741 req_port=rep[1], hb_port=hb[1], **kw)
741 self.xreq_address = (LOCALHOST, xrep)
742 self.sub_address = (LOCALHOST, pub)
743 self.rep_address = (LOCALHOST, req)
744 self.hb_address = (LOCALHOST, hb)
742 self.xreq_address = (xreq[0], xrep)
743 self.sub_address = (sub[0], pub)
744 self.rep_address = (rep[0], req)
745 self.hb_address = (hb[0], _hb)
745 746
746 747 def shutdown_kernel(self, restart=False):
747 748 """ Attempts to the stop the kernel process cleanly. If the kernel
748 749 cannot be stopped, it is killed, if possible.
749 750 """
750 751 # FIXME: Shutdown does not work on Windows due to ZMQ errors!
751 752 if sys.platform == 'win32':
752 753 self.kill_kernel()
753 754 return
754 755
755 756 # Pause the heart beat channel if it exists.
756 757 if self._hb_channel is not None:
757 758 self._hb_channel.pause()
758 759
759 760 # Don't send any additional kernel kill messages immediately, to give
760 761 # the kernel a chance to properly execute shutdown actions. Wait for at
761 762 # most 1s, checking every 0.1s.
762 763 self.xreq_channel.shutdown(restart=restart)
763 764 for i in range(10):
764 765 if self.is_alive:
765 766 time.sleep(0.1)
766 767 else:
767 768 break
768 769 else:
769 770 # OK, we've waited long enough.
770 771 if self.has_kernel:
771 772 self.kill_kernel()
772 773
773 774 def restart_kernel(self, now=False):
774 775 """Restarts a kernel with the same arguments that were used to launch
775 776 it. If the old kernel was launched with random ports, the same ports
776 777 will be used for the new kernel.
777 778
778 779 Parameters
779 780 ----------
780 781 now : bool, optional
781 782 If True, the kernel is forcefully restarted *immediately*, without
782 783 having a chance to do any cleanup action. Otherwise the kernel is
783 784 given 1s to clean up before a forceful restart is issued.
784 785
785 786 In all cases the kernel is restarted, the only difference is whether
786 787 it is given a chance to perform a clean shutdown or not.
787 788 """
788 789 if self._launch_args is None:
789 790 raise RuntimeError("Cannot restart the kernel. "
790 791 "No previous call to 'start_kernel'.")
791 792 else:
792 793 if self.has_kernel:
793 794 if now:
794 795 self.kill_kernel()
795 796 else:
796 797 self.shutdown_kernel(restart=True)
797 798 self.start_kernel(**self._launch_args)
798 799
799 800 # FIXME: Messages get dropped in Windows due to probable ZMQ bug
800 801 # unless there is some delay here.
801 802 if sys.platform == 'win32':
802 803 time.sleep(0.2)
803 804
804 805 @property
805 806 def has_kernel(self):
806 807 """Returns whether a kernel process has been specified for the kernel
807 808 manager.
808 809 """
809 810 return self.kernel is not None
810 811
811 812 def kill_kernel(self):
812 813 """ Kill the running kernel. """
813 814 if self.has_kernel:
814 815 # Pause the heart beat channel if it exists.
815 816 if self._hb_channel is not None:
816 817 self._hb_channel.pause()
817 818
818 819 # Attempt to kill the kernel.
819 820 try:
820 821 self.kernel.kill()
821 822 except OSError, e:
822 823 # In Windows, we will get an Access Denied error if the process
823 824 # has already terminated. Ignore it.
824 825 if not (sys.platform == 'win32' and e.winerror == 5):
825 826 raise
826 827 self.kernel = None
827 828 else:
828 829 raise RuntimeError("Cannot kill kernel. No kernel is running!")
829 830
830 831 def interrupt_kernel(self):
831 832 """ Interrupts the kernel. Unlike ``signal_kernel``, this operation is
832 833 well supported on all platforms.
833 834 """
834 835 if self.has_kernel:
835 836 if sys.platform == 'win32':
836 837 from parentpoller import ParentPollerWindows as Poller
837 838 Poller.send_interrupt(self.kernel.win32_interrupt_event)
838 839 else:
839 840 self.kernel.send_signal(signal.SIGINT)
840 841 else:
841 842 raise RuntimeError("Cannot interrupt kernel. No kernel is running!")
842 843
843 844 def signal_kernel(self, signum):
844 845 """ Sends a signal to the kernel. Note that since only SIGTERM is
845 846 supported on Windows, this function is only useful on Unix systems.
846 847 """
847 848 if self.has_kernel:
848 849 self.kernel.send_signal(signum)
849 850 else:
850 851 raise RuntimeError("Cannot signal kernel. No kernel is running!")
851 852
852 853 @property
853 854 def is_alive(self):
854 855 """Is the kernel process still running?"""
855 856 # FIXME: not using a heartbeat means this method is broken for any
856 857 # remote kernel, it's only capable of handling local kernels.
857 858 if self.has_kernel:
858 859 if self.kernel.poll() is None:
859 860 return True
860 861 else:
861 862 return False
862 863 else:
863 864 # We didn't start the kernel with this KernelManager so we don't
864 865 # know if it is running. We should use a heartbeat for this case.
865 866 return True
866 867
867 868 #--------------------------------------------------------------------------
868 869 # Channels used for communication with the kernel:
869 870 #--------------------------------------------------------------------------
870 871
871 872 @property
872 873 def xreq_channel(self):
873 874 """Get the REQ socket channel object to make requests of the kernel."""
874 875 if self._xreq_channel is None:
875 876 self._xreq_channel = self.xreq_channel_class(self.context,
876 877 self.session,
877 878 self.xreq_address)
878 879 return self._xreq_channel
879 880
880 881 @property
881 882 def sub_channel(self):
882 883 """Get the SUB socket channel object."""
883 884 if self._sub_channel is None:
884 885 self._sub_channel = self.sub_channel_class(self.context,
885 886 self.session,
886 887 self.sub_address)
887 888 return self._sub_channel
888 889
889 890 @property
890 891 def rep_channel(self):
891 892 """Get the REP socket channel object to handle stdin (raw_input)."""
892 893 if self._rep_channel is None:
893 894 self._rep_channel = self.rep_channel_class(self.context,
894 895 self.session,
895 896 self.rep_address)
896 897 return self._rep_channel
897 898
898 899 @property
899 900 def hb_channel(self):
900 901 """Get the REP socket channel object to handle stdin (raw_input)."""
901 902 if self._hb_channel is None:
902 903 self._hb_channel = self.hb_channel_class(self.context,
903 904 self.session,
904 905 self.hb_address)
905 906 return self._hb_channel
@@ -1,296 +1,305 b''
1 1 #!/usr/bin/env python
2 2 """A simple interactive kernel that talks to a frontend over 0MQ.
3 3
4 4 Things to do:
5 5
6 6 * Implement `set_parent` logic. Right before doing exec, the Kernel should
7 7 call set_parent on all the PUB objects with the message about to be executed.
8 8 * Implement random port and security key logic.
9 9 * Implement control messages.
10 10 * Implement event loop and poll version.
11 11 """
12 12
13 13 #-----------------------------------------------------------------------------
14 14 # Imports
15 15 #-----------------------------------------------------------------------------
16 16
17 17 # Standard library imports.
18 18 import __builtin__
19 19 from code import CommandCompiler
20 20 import sys
21 21 import time
22 22 import traceback
23 23
24 24 # System library imports.
25 25 import zmq
26 26
27 27 # Local imports.
28 28 from IPython.utils.traitlets import HasTraits, Instance
29 29 from completer import KernelCompleter
30 30 from entry_point import base_launch_kernel, make_default_main
31 31 from session import Session, Message
32 32
33 33 #-----------------------------------------------------------------------------
34 34 # Main kernel class
35 35 #-----------------------------------------------------------------------------
36 36
37 37 class Kernel(HasTraits):
38 38
39 39 # Private interface
40 40
41 41 # This is a dict of port number that the kernel is listening on. It is set
42 42 # by record_ports and used by connect_request.
43 43 _recorded_ports = None
44 44
45 45 #---------------------------------------------------------------------------
46 46 # Kernel interface
47 47 #---------------------------------------------------------------------------
48 48
49 49 session = Instance(Session)
50 50 reply_socket = Instance('zmq.Socket')
51 51 pub_socket = Instance('zmq.Socket')
52 52 req_socket = Instance('zmq.Socket')
53 53
54 54 def __init__(self, **kwargs):
55 55 super(Kernel, self).__init__(**kwargs)
56 56 self.user_ns = {}
57 57 self.history = []
58 58 self.compiler = CommandCompiler()
59 59 self.completer = KernelCompleter(self.user_ns)
60 60
61 61 # Build dict of handlers for message types
62 62 msg_types = [ 'execute_request', 'complete_request',
63 63 'object_info_request', 'shutdown_request' ]
64 64 self.handlers = {}
65 65 for msg_type in msg_types:
66 66 self.handlers[msg_type] = getattr(self, msg_type)
67 67
68 68 def start(self):
69 69 """ Start the kernel main loop.
70 70 """
71 71 while True:
72 72 ident = self.reply_socket.recv()
73 73 assert self.reply_socket.rcvmore(), "Missing message part."
74 74 msg = self.reply_socket.recv_json()
75 75 omsg = Message(msg)
76 76 print>>sys.__stdout__
77 77 print>>sys.__stdout__, omsg
78 78 handler = self.handlers.get(omsg.msg_type, None)
79 79 if handler is None:
80 80 print >> sys.__stderr__, "UNKNOWN MESSAGE TYPE:", omsg
81 81 else:
82 82 handler(ident, omsg)
83 83
84 84 def record_ports(self, xrep_port, pub_port, req_port, hb_port):
85 85 """Record the ports that this kernel is using.
86 86
87 87 The creator of the Kernel instance must call this methods if they
88 88 want the :meth:`connect_request` method to return the port numbers.
89 89 """
90 90 self._recorded_ports = {
91 91 'xrep_port' : xrep_port,
92 92 'pub_port' : pub_port,
93 93 'req_port' : req_port,
94 94 'hb_port' : hb_port
95 95 }
96 96
97 97 #---------------------------------------------------------------------------
98 98 # Kernel request handlers
99 99 #---------------------------------------------------------------------------
100 100
101 101 def execute_request(self, ident, parent):
102 102 try:
103 103 code = parent[u'content'][u'code']
104 104 except:
105 105 print>>sys.__stderr__, "Got bad msg: "
106 106 print>>sys.__stderr__, Message(parent)
107 107 return
108 108 pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
109 109 self.pub_socket.send_json(pyin_msg)
110 110
111 111 try:
112 112 comp_code = self.compiler(code, '<zmq-kernel>')
113 113
114 114 # Replace raw_input. Note that is not sufficient to replace
115 115 # raw_input in the user namespace.
116 116 raw_input = lambda prompt='': self._raw_input(prompt, ident, parent)
117 117 __builtin__.raw_input = raw_input
118 118
119 119 # Set the parent message of the display hook and out streams.
120 120 sys.displayhook.set_parent(parent)
121 121 sys.stdout.set_parent(parent)
122 122 sys.stderr.set_parent(parent)
123 123
124 124 exec comp_code in self.user_ns, self.user_ns
125 125 except:
126 126 etype, evalue, tb = sys.exc_info()
127 127 tb = traceback.format_exception(etype, evalue, tb)
128 128 exc_content = {
129 129 u'status' : u'error',
130 130 u'traceback' : tb,
131 131 u'ename' : unicode(etype.__name__),
132 132 u'evalue' : unicode(evalue)
133 133 }
134 134 exc_msg = self.session.msg(u'pyerr', exc_content, parent)
135 135 self.pub_socket.send_json(exc_msg)
136 136 reply_content = exc_content
137 137 else:
138 138 reply_content = { 'status' : 'ok', 'payload' : {} }
139 139
140 140 # Flush output before sending the reply.
141 141 sys.stderr.flush()
142 142 sys.stdout.flush()
143 143
144 144 # Send the reply.
145 145 reply_msg = self.session.msg(u'execute_reply', reply_content, parent)
146 146 print>>sys.__stdout__, Message(reply_msg)
147 147 self.reply_socket.send(ident, zmq.SNDMORE)
148 148 self.reply_socket.send_json(reply_msg)
149 149 if reply_msg['content']['status'] == u'error':
150 150 self._abort_queue()
151 151
152 152 def complete_request(self, ident, parent):
153 153 matches = {'matches' : self._complete(parent),
154 154 'status' : 'ok'}
155 155 completion_msg = self.session.send(self.reply_socket, 'complete_reply',
156 156 matches, parent, ident)
157 157 print >> sys.__stdout__, completion_msg
158 158
159 159 def object_info_request(self, ident, parent):
160 160 context = parent['content']['oname'].split('.')
161 161 object_info = self._object_info(context)
162 162 msg = self.session.send(self.reply_socket, 'object_info_reply',
163 163 object_info, parent, ident)
164 164 print >> sys.__stdout__, msg
165 165
166 166 def shutdown_request(self, ident, parent):
167 167 content = dict(parent['content'])
168 168 msg = self.session.send(self.reply_socket, 'shutdown_reply',
169 169 content, parent, ident)
170 170 msg = self.session.send(self.pub_socket, 'shutdown_reply',
171 171 content, parent, ident)
172 172 print >> sys.__stdout__, msg
173 173 time.sleep(0.1)
174 174 sys.exit(0)
175 175
176 176 #---------------------------------------------------------------------------
177 177 # Protected interface
178 178 #---------------------------------------------------------------------------
179 179
180 180 def _abort_queue(self):
181 181 while True:
182 182 try:
183 183 ident = self.reply_socket.recv(zmq.NOBLOCK)
184 184 except zmq.ZMQError, e:
185 185 if e.errno == zmq.EAGAIN:
186 186 break
187 187 else:
188 188 assert self.reply_socket.rcvmore(), "Missing message part."
189 189 msg = self.reply_socket.recv_json()
190 190 print>>sys.__stdout__, "Aborting:"
191 191 print>>sys.__stdout__, Message(msg)
192 192 msg_type = msg['msg_type']
193 193 reply_type = msg_type.split('_')[0] + '_reply'
194 194 reply_msg = self.session.msg(reply_type, {'status':'aborted'}, msg)
195 195 print>>sys.__stdout__, Message(reply_msg)
196 196 self.reply_socket.send(ident,zmq.SNDMORE)
197 197 self.reply_socket.send_json(reply_msg)
198 198 # We need to wait a bit for requests to come in. This can probably
199 199 # be set shorter for true asynchronous clients.
200 200 time.sleep(0.1)
201 201
202 202 def _raw_input(self, prompt, ident, parent):
203 203 # Flush output before making the request.
204 204 sys.stderr.flush()
205 205 sys.stdout.flush()
206 206
207 207 # Send the input request.
208 208 content = dict(prompt=prompt)
209 209 msg = self.session.msg(u'input_request', content, parent)
210 210 self.req_socket.send_json(msg)
211 211
212 212 # Await a response.
213 213 reply = self.req_socket.recv_json()
214 214 try:
215 215 value = reply['content']['value']
216 216 except:
217 217 print>>sys.__stderr__, "Got bad raw_input reply: "
218 218 print>>sys.__stderr__, Message(parent)
219 219 value = ''
220 220 return value
221 221
222 222 def _complete(self, msg):
223 223 return self.completer.complete(msg.content.line, msg.content.text)
224 224
225 225 def _object_info(self, context):
226 226 symbol, leftover = self._symbol_from_context(context)
227 227 if symbol is not None and not leftover:
228 228 doc = getattr(symbol, '__doc__', '')
229 229 else:
230 230 doc = ''
231 231 object_info = dict(docstring = doc)
232 232 return object_info
233 233
234 234 def _symbol_from_context(self, context):
235 235 if not context:
236 236 return None, context
237 237
238 238 base_symbol_string = context[0]
239 239 symbol = self.user_ns.get(base_symbol_string, None)
240 240 if symbol is None:
241 241 symbol = __builtin__.__dict__.get(base_symbol_string, None)
242 242 if symbol is None:
243 243 return None, context
244 244
245 245 context = context[1:]
246 246 for i, name in enumerate(context):
247 247 new_symbol = getattr(symbol, name, None)
248 248 if new_symbol is None:
249 249 return symbol, context[i:]
250 250 else:
251 251 symbol = new_symbol
252 252
253 253 return symbol, []
254 254
255 255 #-----------------------------------------------------------------------------
256 256 # Kernel main and launch functions
257 257 #-----------------------------------------------------------------------------
258 258
259 def launch_kernel(xrep_port=0, pub_port=0, req_port=0, hb_port=0,
259 def launch_kernel(ip=None, xrep_port=0, pub_port=0, req_port=0, hb_port=0,
260 260 independent=False):
261 261 """ Launches a localhost kernel, binding to the specified ports.
262 262
263 263 Parameters
264 264 ----------
265 ip : str, optional
266 The ip address the kernel will bind to.
267
265 268 xrep_port : int, optional
266 269 The port to use for XREP channel.
267 270
268 271 pub_port : int, optional
269 272 The port to use for the SUB channel.
270 273
271 274 req_port : int, optional
272 275 The port to use for the REQ (raw input) channel.
273 276
274 277 hb_port : int, optional
275 278 The port to use for the hearbeat REP channel.
276 279
277 280 independent : bool, optional (default False)
278 281 If set, the kernel process is guaranteed to survive if this process
279 282 dies. If not set, an effort is made to ensure that the kernel is killed
280 283 when this process dies. Note that in this case it is still good practice
281 284 to kill kernels manually before exiting.
282 285
283 286 Returns
284 287 -------
285 288 A tuple of form:
286 289 (kernel_process, xrep_port, pub_port, req_port)
287 290 where kernel_process is a Popen object and the ports are integers.
288 291 """
292 extra_arguments = []
293 if ip is not None:
294 extra_arguments.append('--ip')
295 if isinstance(ip, basestring):
296 extra_arguments.append(ip)
297
289 298 return base_launch_kernel('from IPython.zmq.pykernel import main; main()',
290 299 xrep_port, pub_port, req_port, hb_port,
291 independent)
300 independent, extra_arguments=extra_arguments)
292 301
293 302 main = make_default_main(Kernel)
294 303
295 304 if __name__ == '__main__':
296 305 main()
General Comments 0
You need to be logged in to leave comments. Login now