##// END OF EJS Templates
* Added a function for spawning a localhost kernel in a new process on random ports....
epatters -
Show More
@@ -1,82 +1,95 b''
1 1 # System library imports
2 2 from PyQt4 import QtCore, QtGui
3 3
4 4 # Local imports
5 5 from frontend_widget import FrontendWidget
6 6
7 7
8 8 class IPythonWidget(FrontendWidget):
9 9 """ A FrontendWidget for an IPython kernel.
10 10 """
11 11
12 12 #---------------------------------------------------------------------------
13 13 # 'FrontendWidget' interface
14 14 #---------------------------------------------------------------------------
15 15
16 16 def __init__(self, kernel_manager, parent=None):
17 17 super(IPythonWidget, self).__init__(kernel_manager, parent)
18 18
19 19 self._magic_overrides = {}
20 20
21 21 def execute_source(self, source, hidden=False, interactive=False):
22 22 """ Reimplemented to override magic commands.
23 23 """
24 24 magic_source = source.strip()
25 25 if magic_source.startswith('%'):
26 26 magic_source = magic_source[1:]
27 27 magic, sep, arguments = magic_source.partition(' ')
28 28 if not magic:
29 29 magic = magic_source
30 30
31 31 callback = self._magic_overrides.get(magic)
32 32 if callback:
33 33 output = callback(arguments)
34 34 if output:
35 35 self.appendPlainText(output)
36 36 self._show_prompt('>>> ')
37 37 return True
38 38 else:
39 39 return super(IPythonWidget, self).execute_source(source, hidden,
40 40 interactive)
41 41
42 42 #---------------------------------------------------------------------------
43 43 # 'IPythonWidget' interface
44 44 #---------------------------------------------------------------------------
45 45
46 46 def set_magic_override(self, magic, callback):
47 47 """ Overrides an IPython magic command. This magic will be intercepted
48 48 by the frontend rather than passed on to the kernel and 'callback'
49 49 will be called with a single argument: a string of argument(s) for
50 50 the magic. The callback can (optionally) return text to print to the
51 51 console.
52 52 """
53 53 self._magic_overrides[magic] = callback
54 54
55 55 def remove_magic_override(self, magic):
56 56 """ Removes the override for the specified magic, if there is one.
57 57 """
58 58 try:
59 59 del self._magic_overrides[magic]
60 60 except KeyError:
61 61 pass
62 62
63 63
64 64 if __name__ == '__main__':
65 import sys
65 from IPython.external.argparse import ArgumentParser
66 66 from IPython.frontend.qt.kernelmanager import QtKernelManager
67 67
68 # Don't let Qt swallow KeyboardInterupts.
69 import signal
70 signal.signal(signal.SIGINT, signal.SIG_DFL)
71
72 # Parse command line arguments.
73 parser = ArgumentParser()
74 parser.add_argument('--ip', type=str, default='127.0.0.1',
75 help='set the kernel\'s IP address [default localhost]')
76 parser.add_argument('--xreq', type=int, metavar='PORT', default=5575,
77 help='set the XREQ Channel port [default %(default)i]')
78 parser.add_argument('--sub', type=int, metavar='PORT', default=5576,
79 help='set the SUB Channel port [default %(default)i]')
80 namespace = parser.parse_args()
81
68 82 # Create KernelManager
69 kernel_manager = QtKernelManager(xreq_address = ('127.0.0.1', 5575),
70 sub_address = ('127.0.0.1', 5576),
71 rep_address = ('127.0.0.1', 5577))
72 kernel_manager.sub_channel.start()
73 kernel_manager.xreq_channel.start()
83 ip = namespace.ip
84 kernel_manager = QtKernelManager(xreq_address = (ip, namespace.xreq),
85 sub_address = (ip, namespace.sub))
86 kernel_manager.start_listening()
74 87
75 88 # Launch application
76 app = QtGui.QApplication(sys.argv)
89 app = QtGui.QApplication([])
77 90 widget = IPythonWidget(kernel_manager)
78 91 widget.setWindowTitle('Python')
79 92 widget.resize(640, 480)
80 93 widget.show()
81 sys.exit(app.exec_())
94 app.exec_()
82 95
@@ -1,311 +1,349 b''
1 1 #!/usr/bin/env python
2 2 """A simple interactive kernel that talks to a frontend over 0MQ.
3 3
4 4 Things to do:
5 5
6 6 * Finish implementing `raw_input`.
7 7 * Implement `set_parent` logic. Right before doing exec, the Kernel should
8 8 call set_parent on all the PUB objects with the message about to be executed.
9 9 * Implement random port and security key logic.
10 10 * Implement control messages.
11 11 * Implement event loop and poll version.
12 12 """
13 13
14 # Standard library imports.
14 15 import __builtin__
15 16 import sys
16 17 import time
17 18 import traceback
18
19 19 from code import CommandCompiler
20 20
21 # System library imports.
21 22 import zmq
22 23
24 # Local imports.
23 25 from session import Session, Message, extract_header
24 26 from completer import KernelCompleter
25 27
28
26 29 class OutStream(object):
27 30 """A file like object that publishes the stream to a 0MQ PUB socket."""
28 31
29 32 def __init__(self, session, pub_socket, name, max_buffer=200):
30 33 self.session = session
31 34 self.pub_socket = pub_socket
32 35 self.name = name
33 36 self._buffer = []
34 37 self._buffer_len = 0
35 38 self.max_buffer = max_buffer
36 39 self.parent_header = {}
37 40
38 41 def set_parent(self, parent):
39 42 self.parent_header = extract_header(parent)
40 43
41 44 def close(self):
42 45 self.pub_socket = None
43 46
44 47 def flush(self):
45 48 if self.pub_socket is None:
46 49 raise ValueError(u'I/O operation on closed file')
47 50 else:
48 51 if self._buffer:
49 52 data = ''.join(self._buffer)
50 53 content = {u'name':self.name, u'data':data}
51 54 msg = self.session.msg(u'stream', content=content,
52 55 parent=self.parent_header)
53 56 print>>sys.__stdout__, Message(msg)
54 57 self.pub_socket.send_json(msg)
55 58 self._buffer_len = 0
56 59 self._buffer = []
57 60
58 61 def isattr(self):
59 62 return False
60 63
61 64 def next(self):
62 65 raise IOError('Read not supported on a write only stream.')
63 66
64 67 def read(self, size=None):
65 68 raise IOError('Read not supported on a write only stream.')
66 69
67 70 readline=read
68 71
69 72 def write(self, s):
70 73 if self.pub_socket is None:
71 74 raise ValueError('I/O operation on closed file')
72 75 else:
73 76 self._buffer.append(s)
74 77 self._buffer_len += len(s)
75 78 self._maybe_send()
76 79
77 80 def _maybe_send(self):
78 81 if '\n' in self._buffer[-1]:
79 82 self.flush()
80 83 if self._buffer_len > self.max_buffer:
81 84 self.flush()
82 85
83 86 def writelines(self, sequence):
84 87 if self.pub_socket is None:
85 88 raise ValueError('I/O operation on closed file')
86 89 else:
87 90 for s in sequence:
88 91 self.write(s)
89 92
90 93
91 94 class DisplayHook(object):
92 95
93 96 def __init__(self, session, pub_socket):
94 97 self.session = session
95 98 self.pub_socket = pub_socket
96 99 self.parent_header = {}
97 100
98 101 def __call__(self, obj):
99 102 if obj is None:
100 103 return
101 104
102 105 __builtin__._ = obj
103 106 msg = self.session.msg(u'pyout', {u'data':repr(obj)},
104 107 parent=self.parent_header)
105 108 self.pub_socket.send_json(msg)
106 109
107 110 def set_parent(self, parent):
108 111 self.parent_header = extract_header(parent)
109 112
110 113
111 114 class RawInput(object):
112 115
113 116 def __init__(self, session, socket):
114 117 self.session = session
115 118 self.socket = socket
116 119
117 120 def __call__(self, prompt=None):
118 121 msg = self.session.msg(u'raw_input')
119 122 self.socket.send_json(msg)
120 123 while True:
121 124 try:
122 125 reply = self.socket.recv_json(zmq.NOBLOCK)
123 126 except zmq.ZMQError, e:
124 127 if e.errno == zmq.EAGAIN:
125 128 pass
126 129 else:
127 130 raise
128 131 else:
129 132 break
130 133 return reply[u'content'][u'data']
131 134
132 135
133 136 class Kernel(object):
134 137
135 138 def __init__(self, session, reply_socket, pub_socket):
136 139 self.session = session
137 140 self.reply_socket = reply_socket
138 141 self.pub_socket = pub_socket
139 142 self.user_ns = {}
140 143 self.history = []
141 144 self.compiler = CommandCompiler()
142 145 self.completer = KernelCompleter(self.user_ns)
143 146
144 147 # Build dict of handlers for message types
145 148 msg_types = [ 'execute_request', 'complete_request',
146 149 'object_info_request' ]
147 150 self.handlers = {}
148 151 for msg_type in msg_types:
149 152 self.handlers[msg_type] = getattr(self, msg_type)
150 153
151 154 def abort_queue(self):
152 155 while True:
153 156 try:
154 157 ident = self.reply_socket.recv(zmq.NOBLOCK)
155 158 except zmq.ZMQError, e:
156 159 if e.errno == zmq.EAGAIN:
157 160 break
158 161 else:
159 162 assert self.reply_socket.rcvmore(), "Unexpected missing message part."
160 163 msg = self.reply_socket.recv_json()
161 164 print>>sys.__stdout__, "Aborting:"
162 165 print>>sys.__stdout__, Message(msg)
163 166 msg_type = msg['msg_type']
164 167 reply_type = msg_type.split('_')[0] + '_reply'
165 168 reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg)
166 169 print>>sys.__stdout__, Message(reply_msg)
167 170 self.reply_socket.send(ident,zmq.SNDMORE)
168 171 self.reply_socket.send_json(reply_msg)
169 172 # We need to wait a bit for requests to come in. This can probably
170 173 # be set shorter for true asynchronous clients.
171 174 time.sleep(0.1)
172 175
173 176 def execute_request(self, ident, parent):
174 177 try:
175 178 code = parent[u'content'][u'code']
176 179 except:
177 180 print>>sys.__stderr__, "Got bad msg: "
178 181 print>>sys.__stderr__, Message(parent)
179 182 return
180 183 pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
181 184 self.pub_socket.send_json(pyin_msg)
182 185 try:
183 186 comp_code = self.compiler(code, '<zmq-kernel>')
184 187 sys.displayhook.set_parent(parent)
185 188 exec comp_code in self.user_ns, self.user_ns
186 189 except:
187 190 result = u'error'
188 191 etype, evalue, tb = sys.exc_info()
189 192 tb = traceback.format_exception(etype, evalue, tb)
190 193 exc_content = {
191 194 u'status' : u'error',
192 195 u'traceback' : tb,
193 196 u'etype' : unicode(etype),
194 197 u'evalue' : unicode(evalue)
195 198 }
196 199 exc_msg = self.session.msg(u'pyerr', exc_content, parent)
197 200 self.pub_socket.send_json(exc_msg)
198 201 reply_content = exc_content
199 202 else:
200 203 reply_content = {'status' : 'ok'}
201 204 reply_msg = self.session.msg(u'execute_reply', reply_content, parent)
202 205 print>>sys.__stdout__, Message(reply_msg)
203 206 self.reply_socket.send(ident, zmq.SNDMORE)
204 207 self.reply_socket.send_json(reply_msg)
205 208 if reply_msg['content']['status'] == u'error':
206 209 self.abort_queue()
207 210
208 211 def complete_request(self, ident, parent):
209 212 matches = {'matches' : self.complete(parent),
210 213 'status' : 'ok'}
211 214 completion_msg = self.session.send(self.reply_socket, 'complete_reply',
212 215 matches, parent, ident)
213 216 print >> sys.__stdout__, completion_msg
214 217
215 218 def complete(self, msg):
216 219 return self.completer.complete(msg.content.line, msg.content.text)
217 220
218 221 def object_info_request(self, ident, parent):
219 222 context = parent['content']['oname'].split('.')
220 223 object_info = self.object_info(context)
221 224 msg = self.session.send(self.reply_socket, 'object_info_reply',
222 225 object_info, parent, ident)
223 226 print >> sys.__stdout__, msg
224 227
225 228 def object_info(self, context):
226 229 symbol, leftover = self.symbol_from_context(context)
227 230 if symbol is not None and not leftover:
228 231 doc = getattr(symbol, '__doc__', '')
229 232 else:
230 233 doc = ''
231 234 object_info = dict(docstring = doc)
232 235 return object_info
233 236
234 237 def symbol_from_context(self, context):
235 238 if not context:
236 239 return None, context
237 240
238 241 base_symbol_string = context[0]
239 242 symbol = self.user_ns.get(base_symbol_string, None)
240 243 if symbol is None:
241 244 symbol = __builtin__.__dict__.get(base_symbol_string, None)
242 245 if symbol is None:
243 246 return None, context
244 247
245 248 context = context[1:]
246 249 for i, name in enumerate(context):
247 250 new_symbol = getattr(symbol, name, None)
248 251 if new_symbol is None:
249 252 return symbol, context[i:]
250 253 else:
251 254 symbol = new_symbol
252 255
253 256 return symbol, []
254 257
255 258 def start(self):
256 259 while True:
257 260 ident = self.reply_socket.recv()
258 261 assert self.reply_socket.rcvmore(), "Unexpected missing message part."
259 262 msg = self.reply_socket.recv_json()
260 263 omsg = Message(msg)
261 264 print>>sys.__stdout__
262 265 print>>sys.__stdout__, omsg
263 266 handler = self.handlers.get(omsg.msg_type, None)
264 267 if handler is None:
265 268 print >> sys.__stderr__, "UNKNOWN MESSAGE TYPE:", omsg
266 269 else:
267 270 handler(ident, omsg)
268 271
269 272
270 def main():
271 c = zmq.Context()
272
273 ip = '127.0.0.1'
274 port_base = 5575
275 connection = ('tcp://%s' % ip) + ':%i'
276 rep_conn = connection % port_base
277 pub_conn = connection % (port_base+1)
278
273 def bind_port(socket, ip, port):
274 """ Binds the specified ZMQ socket. If the port is less than zero, a random
275 port is chosen. Returns the port that was bound.
276 """
277 connection = 'tcp://%s' % ip
278 if port < 0:
279 port = socket.bind_to_random_port(connection)
280 else:
281 connection += ':%i' % port
282 socket.bind(connection)
283 return port
284
285 def main(ip='127.0.0.1', rep_port=-1, pub_port=-1):
286 """ Start a kernel on 'ip' (default localhost) at the specified ports. If
287 ports are not specified, they are chosen at random.
288 """
279 289 print >>sys.__stdout__, "Starting the kernel..."
280 print >>sys.__stdout__, "XREP Channel:", rep_conn
281 print >>sys.__stdout__, "PUB Channel:", pub_conn
282 290
291 context = zmq.Context()
283 292 session = Session(username=u'kernel')
284 293
285 reply_socket = c.socket(zmq.XREP)
286 reply_socket.bind(rep_conn)
287
288 pub_socket = c.socket(zmq.PUB)
289 pub_socket.bind(pub_conn)
294 reply_socket = context.socket(zmq.XREP)
295 rep_port = bind_port(reply_socket, ip, rep_port)
296 print >>sys.__stdout__, "XREP Channel on port", rep_port
290 297
291 stdout = OutStream(session, pub_socket, u'stdout')
292 stderr = OutStream(session, pub_socket, u'stderr')
293 sys.stdout = stdout
294 sys.stderr = stderr
298 pub_socket = context.socket(zmq.PUB)
299 pub_port = bind_port(pub_socket, ip, pub_port)
300 print >>sys.__stdout__, "PUB Channel on port", pub_port
295 301
296 display_hook = DisplayHook(session, pub_socket)
297 sys.displayhook = display_hook
302 sys.stdout = OutStream(session, pub_socket, u'stdout')
303 sys.stderr = OutStream(session, pub_socket, u'stderr')
304 sys.displayhook = DisplayHook(session, pub_socket)
298 305
299 306 kernel = Kernel(session, reply_socket, pub_socket)
300 307
301 308 # For debugging convenience, put sleep and a string in the namespace, so we
302 309 # have them every time we start.
303 310 kernel.user_ns['sleep'] = time.sleep
304 311 kernel.user_ns['s'] = 'Test string'
305 312
306 313 print >>sys.__stdout__, "Use Ctrl-\\ (NOT Ctrl-C!) to terminate."
307 314 kernel.start()
308 315
316 def launch_kernel():
317 """ Launches a kernel on this machine and binds its to channels to open
318 ports as it determined by the OS.
319
320 Returns a tuple of form:
321 (kernel_process [Popen], rep_port [int], sub_port [int])
322 """
323 import socket
324 from subprocess import Popen
325
326 # Find some open ports.
327 ports = []
328 for i in xrange(2):
329 sock = socket.socket()
330 sock.bind(('', 0))
331 ports.append(sock)
332 for i, sock in enumerate(ports):
333 port = sock.getsockname()[1]
334 sock.close()
335 ports[i] = port
336 rep_port, sub_port = ports
337
338 # Spawn a kernel.
339 command = 'from IPython.zmq.kernel import main;' \
340 'main(rep_port=%i, pub_port=%i)'
341 proc = Popen([sys.executable, '-c', command % (rep_port, sub_port)])
342
343 return proc, rep_port, sub_port
344
309 345
310 346 if __name__ == '__main__':
311 main()
347 base_port = 5575
348 main(rep_port = base_port,
349 pub_port = base_port + 1)
General Comments 0
You need to be logged in to leave comments. Login now