##// END OF EJS Templates
Separating kernel into smaller pieces.
Brian Granger -
Show More
@@ -0,0 +1,22 b''
1 import __builtin__
2
3 from session import extract_header
4
5 class DisplayHook(object):
6
7 def __init__(self, session, pub_socket):
8 self.session = session
9 self.pub_socket = pub_socket
10 self.parent_header = {}
11
12 def __call__(self, obj):
13 if obj is None:
14 return
15
16 __builtin__._ = obj
17 msg = self.session.msg(u'pyout', {u'data':repr(obj)},
18 parent=self.parent_header)
19 self.pub_socket.send_json(msg)
20
21 def set_parent(self, parent):
22 self.parent_header = extract_header(parent) No newline at end of file
@@ -0,0 +1,42 b''
1 import os
2 import time
3 from threading import Thread
4
5
6 class ExitPollerUnix(Thread):
7 """ A Unix-specific daemon thread that terminates the program immediately
8 when the parent process no longer exists.
9 """
10
11 def __init__(self):
12 super(ExitPollerUnix, self).__init__()
13 self.daemon = True
14
15 def run(self):
16 # We cannot use os.waitpid because it works only for child processes.
17 from errno import EINTR
18 while True:
19 try:
20 if os.getppid() == 1:
21 os._exit(1)
22 time.sleep(1.0)
23 except OSError, e:
24 if e.errno == EINTR:
25 continue
26 raise
27
28 class ExitPollerWindows(Thread):
29 """ A Windows-specific daemon thread that terminates the program immediately
30 when a Win32 handle is signaled.
31 """
32
33 def __init__(self, handle):
34 super(ExitPollerWindows, self).__init__()
35 self.daemon = True
36 self.handle = handle
37
38 def run(self):
39 from _subprocess import WaitForSingleObject, WAIT_OBJECT_0, INFINITE
40 result = WaitForSingleObject(self.handle, INFINITE)
41 if result == WAIT_OBJECT_0:
42 os._exit(1) No newline at end of file
@@ -0,0 +1,77 b''
1 import sys
2 import time
3 from cStringIO import StringIO
4
5 from session import extract_header, Message
6
7 #-----------------------------------------------------------------------------
8 # Stream classes
9 #-----------------------------------------------------------------------------
10
11 class OutStream(object):
12 """A file like object that publishes the stream to a 0MQ PUB socket."""
13
14 # The time interval between automatic flushes, in seconds.
15 flush_interval = 0.05
16
17 def __init__(self, session, pub_socket, name):
18 self.session = session
19 self.pub_socket = pub_socket
20 self.name = name
21 self.parent_header = {}
22 self._new_buffer()
23
24 def set_parent(self, parent):
25 self.parent_header = extract_header(parent)
26
27 def close(self):
28 self.pub_socket = None
29
30 def flush(self):
31 if self.pub_socket is None:
32 raise ValueError(u'I/O operation on closed file')
33 else:
34 data = self._buffer.getvalue()
35 if data:
36 content = {u'name':self.name, u'data':data}
37 msg = self.session.msg(u'stream', content=content,
38 parent=self.parent_header)
39 print>>sys.__stdout__, Message(msg)
40 self.pub_socket.send_json(msg)
41
42 self._buffer.close()
43 self._new_buffer()
44
45 def isatty(self):
46 return False
47
48 def next(self):
49 raise IOError('Read not supported on a write only stream.')
50
51 def read(self, size=-1):
52 raise IOError('Read not supported on a write only stream.')
53
54 def readline(self, size=-1):
55 raise IOError('Read not supported on a write only stream.')
56
57 def write(self, string):
58 if self.pub_socket is None:
59 raise ValueError('I/O operation on closed file')
60 else:
61 self._buffer.write(string)
62 current_time = time.time()
63 if self._start <= 0:
64 self._start = current_time
65 elif current_time - self._start > self.flush_interval:
66 self.flush()
67
68 def writelines(self, sequence):
69 if self.pub_socket is None:
70 raise ValueError('I/O operation on closed file')
71 else:
72 for string in sequence:
73 self.write(string)
74
75 def _new_buffer(self):
76 self._buffer = StringIO()
77 self._start = -1 No newline at end of file
@@ -1,488 +1,359 b''
1 1 #!/usr/bin/env python
2 2 """A simple interactive kernel that talks to a frontend over 0MQ.
3 3
4 4 Things to do:
5 5
6 * Finish implementing `raw_input`.
7 6 * Implement `set_parent` logic. Right before doing exec, the Kernel should
8 7 call set_parent on all the PUB objects with the message about to be executed.
9 8 * Implement random port and security key logic.
10 9 * Implement control messages.
11 10 * Implement event loop and poll version.
12 11 """
13 12
14 13 #-----------------------------------------------------------------------------
15 14 # Imports
16 15 #-----------------------------------------------------------------------------
17 16
18 17 # Standard library imports.
19 18 import __builtin__
20 19 from code import CommandCompiler
21 from cStringIO import StringIO
22 20 import os
23 21 import sys
24 from threading import Thread
25 22 import time
26 23 import traceback
27 24
28 25 # System library imports.
29 26 import zmq
30 27
31 28 # Local imports.
32 29 from IPython.external.argparse import ArgumentParser
33 from session import Session, Message, extract_header
30 from session import Session, Message
34 31 from completer import KernelCompleter
32 from .iostream import OutStream
33 from .displayhook import DisplayHook
34 from .exitpoller import ExitPollerUnix, ExitPollerWindows
35 35
36 36 #-----------------------------------------------------------------------------
37 # Kernel and stream classes
37 # Main kernel class
38 38 #-----------------------------------------------------------------------------
39 39
40 class OutStream(object):
41 """A file like object that publishes the stream to a 0MQ PUB socket."""
42
43 # The time interval between automatic flushes, in seconds.
44 flush_interval = 0.05
45
46 def __init__(self, session, pub_socket, name):
47 self.session = session
48 self.pub_socket = pub_socket
49 self.name = name
50 self.parent_header = {}
51 self._new_buffer()
52
53 def set_parent(self, parent):
54 self.parent_header = extract_header(parent)
55
56 def close(self):
57 self.pub_socket = None
58
59 def flush(self):
60 if self.pub_socket is None:
61 raise ValueError(u'I/O operation on closed file')
62 else:
63 data = self._buffer.getvalue()
64 if data:
65 content = {u'name':self.name, u'data':data}
66 msg = self.session.msg(u'stream', content=content,
67 parent=self.parent_header)
68 print>>sys.__stdout__, Message(msg)
69 self.pub_socket.send_json(msg)
70
71 self._buffer.close()
72 self._new_buffer()
73
74 def isatty(self):
75 return False
76
77 def next(self):
78 raise IOError('Read not supported on a write only stream.')
79
80 def read(self, size=-1):
81 raise IOError('Read not supported on a write only stream.')
82
83 def readline(self, size=-1):
84 raise IOError('Read not supported on a write only stream.')
85
86 def write(self, string):
87 if self.pub_socket is None:
88 raise ValueError('I/O operation on closed file')
89 else:
90 self._buffer.write(string)
91 current_time = time.time()
92 if self._start <= 0:
93 self._start = current_time
94 elif current_time - self._start > self.flush_interval:
95 self.flush()
96
97 def writelines(self, sequence):
98 if self.pub_socket is None:
99 raise ValueError('I/O operation on closed file')
100 else:
101 for string in sequence:
102 self.write(string)
103
104 def _new_buffer(self):
105 self._buffer = StringIO()
106 self._start = -1
107
108
109 class DisplayHook(object):
110
111 def __init__(self, session, pub_socket):
112 self.session = session
113 self.pub_socket = pub_socket
114 self.parent_header = {}
115
116 def __call__(self, obj):
117 if obj is None:
118 return
119
120 __builtin__._ = obj
121 msg = self.session.msg(u'pyout', {u'data':repr(obj)},
122 parent=self.parent_header)
123 self.pub_socket.send_json(msg)
124
125 def set_parent(self, parent):
126 self.parent_header = extract_header(parent)
127
128
129 40 class Kernel(object):
130 41
131 42 def __init__(self, session, reply_socket, pub_socket, req_socket):
132 43 self.session = session
133 44 self.reply_socket = reply_socket
134 45 self.pub_socket = pub_socket
135 46 self.req_socket = req_socket
136 47 self.user_ns = {}
137 48 self.history = []
138 49 self.compiler = CommandCompiler()
139 50 self.completer = KernelCompleter(self.user_ns)
140 51
141 52 # Build dict of handlers for message types
142 53 msg_types = [ 'execute_request', 'complete_request',
143 54 'object_info_request' ]
144 55 self.handlers = {}
145 56 for msg_type in msg_types:
146 57 self.handlers[msg_type] = getattr(self, msg_type)
147 58
148 59 def abort_queue(self):
149 60 while True:
150 61 try:
151 62 ident = self.reply_socket.recv(zmq.NOBLOCK)
152 63 except zmq.ZMQError, e:
153 64 if e.errno == zmq.EAGAIN:
154 65 break
155 66 else:
156 67 assert self.reply_socket.rcvmore(), "Unexpected missing message part."
157 68 msg = self.reply_socket.recv_json()
158 69 print>>sys.__stdout__, "Aborting:"
159 70 print>>sys.__stdout__, Message(msg)
160 71 msg_type = msg['msg_type']
161 72 reply_type = msg_type.split('_')[0] + '_reply'
162 73 reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg)
163 74 print>>sys.__stdout__, Message(reply_msg)
164 75 self.reply_socket.send(ident,zmq.SNDMORE)
165 76 self.reply_socket.send_json(reply_msg)
166 77 # We need to wait a bit for requests to come in. This can probably
167 78 # be set shorter for true asynchronous clients.
168 79 time.sleep(0.1)
169 80
170 81 def execute_request(self, ident, parent):
171 82 try:
172 83 code = parent[u'content'][u'code']
173 84 except:
174 85 print>>sys.__stderr__, "Got bad msg: "
175 86 print>>sys.__stderr__, Message(parent)
176 87 return
177 88 pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
178 89 self.pub_socket.send_json(pyin_msg)
179 90
180 91 try:
181 92 comp_code = self.compiler(code, '<zmq-kernel>')
182 93
183 94 # Replace raw_input. Note that is not sufficient to replace
184 95 # raw_input in the user namespace.
185 96 raw_input = lambda prompt='': self.raw_input(prompt, ident, parent)
186 97 __builtin__.raw_input = raw_input
187 98
188 99 # Configure the display hook.
189 100 sys.displayhook.set_parent(parent)
190 101
191 102 exec comp_code in self.user_ns, self.user_ns
192 103 except:
193 result = u'error'
194 104 etype, evalue, tb = sys.exc_info()
195 105 tb = traceback.format_exception(etype, evalue, tb)
196 106 exc_content = {
197 107 u'status' : u'error',
198 108 u'traceback' : tb,
199 109 u'ename' : unicode(etype.__name__),
200 110 u'evalue' : unicode(evalue)
201 111 }
202 112 exc_msg = self.session.msg(u'pyerr', exc_content, parent)
203 113 self.pub_socket.send_json(exc_msg)
204 114 reply_content = exc_content
205 115 else:
206 116 reply_content = {'status' : 'ok'}
207 117
208 118 # Flush output before sending the reply.
209 119 sys.stderr.flush()
210 120 sys.stdout.flush()
211 121
212 122 # Send the reply.
213 123 reply_msg = self.session.msg(u'execute_reply', reply_content, parent)
214 124 print>>sys.__stdout__, Message(reply_msg)
215 125 self.reply_socket.send(ident, zmq.SNDMORE)
216 126 self.reply_socket.send_json(reply_msg)
217 127 if reply_msg['content']['status'] == u'error':
218 128 self.abort_queue()
219 129
220 130 def raw_input(self, prompt, ident, parent):
221 131 # Flush output before making the request.
222 132 sys.stderr.flush()
223 133 sys.stdout.flush()
224 134
225 135 # Send the input request.
226 136 content = dict(prompt=prompt)
227 137 msg = self.session.msg(u'input_request', content, parent)
228 138 self.req_socket.send_json(msg)
229 139
230 140 # Await a response.
231 141 reply = self.req_socket.recv_json()
232 142 try:
233 143 value = reply['content']['value']
234 144 except:
235 145 print>>sys.__stderr__, "Got bad raw_input reply: "
236 146 print>>sys.__stderr__, Message(parent)
237 147 value = ''
238 148 return value
239 149
240 150 def complete_request(self, ident, parent):
241 151 matches = {'matches' : self.complete(parent),
242 152 'status' : 'ok'}
243 153 completion_msg = self.session.send(self.reply_socket, 'complete_reply',
244 154 matches, parent, ident)
245 155 print >> sys.__stdout__, completion_msg
246 156
247 157 def complete(self, msg):
248 158 return self.completer.complete(msg.content.line, msg.content.text)
249 159
250 160 def object_info_request(self, ident, parent):
251 161 context = parent['content']['oname'].split('.')
252 162 object_info = self.object_info(context)
253 163 msg = self.session.send(self.reply_socket, 'object_info_reply',
254 164 object_info, parent, ident)
255 165 print >> sys.__stdout__, msg
256 166
257 167 def object_info(self, context):
258 168 symbol, leftover = self.symbol_from_context(context)
259 169 if symbol is not None and not leftover:
260 170 doc = getattr(symbol, '__doc__', '')
261 171 else:
262 172 doc = ''
263 173 object_info = dict(docstring = doc)
264 174 return object_info
265 175
266 176 def symbol_from_context(self, context):
267 177 if not context:
268 178 return None, context
269 179
270 180 base_symbol_string = context[0]
271 181 symbol = self.user_ns.get(base_symbol_string, None)
272 182 if symbol is None:
273 183 symbol = __builtin__.__dict__.get(base_symbol_string, None)
274 184 if symbol is None:
275 185 return None, context
276 186
277 187 context = context[1:]
278 188 for i, name in enumerate(context):
279 189 new_symbol = getattr(symbol, name, None)
280 190 if new_symbol is None:
281 191 return symbol, context[i:]
282 192 else:
283 193 symbol = new_symbol
284 194
285 195 return symbol, []
286 196
287 197 def start(self):
288 198 while True:
289 199 ident = self.reply_socket.recv()
290 200 assert self.reply_socket.rcvmore(), "Missing message part."
291 201 msg = self.reply_socket.recv_json()
292 202 omsg = Message(msg)
293 203 print>>sys.__stdout__
294 204 print>>sys.__stdout__, omsg
295 205 handler = self.handlers.get(omsg.msg_type, None)
296 206 if handler is None:
297 207 print >> sys.__stderr__, "UNKNOWN MESSAGE TYPE:", omsg
298 208 else:
299 209 handler(ident, omsg)
300 210
301 211 #-----------------------------------------------------------------------------
302 212 # Kernel main and launch functions
303 213 #-----------------------------------------------------------------------------
304 214
305 class ExitPollerUnix(Thread):
306 """ A Unix-specific daemon thread that terminates the program immediately
307 when the parent process no longer exists.
308 """
309
310 def __init__(self):
311 super(ExitPollerUnix, self).__init__()
312 self.daemon = True
313
314 def run(self):
315 # We cannot use os.waitpid because it works only for child processes.
316 from errno import EINTR
317 while True:
318 try:
319 if os.getppid() == 1:
320 os._exit(1)
321 time.sleep(1.0)
322 except OSError, e:
323 if e.errno == EINTR:
324 continue
325 raise
326
327 class ExitPollerWindows(Thread):
328 """ A Windows-specific daemon thread that terminates the program immediately
329 when a Win32 handle is signaled.
330 """
331
332 def __init__(self, handle):
333 super(ExitPollerWindows, self).__init__()
334 self.daemon = True
335 self.handle = handle
336
337 def run(self):
338 from _subprocess import WaitForSingleObject, WAIT_OBJECT_0, INFINITE
339 result = WaitForSingleObject(self.handle, INFINITE)
340 if result == WAIT_OBJECT_0:
341 os._exit(1)
342
343
344 215 def bind_port(socket, ip, port):
345 216 """ Binds the specified ZMQ socket. If the port is less than zero, a random
346 217 port is chosen. Returns the port that was bound.
347 218 """
348 219 connection = 'tcp://%s' % ip
349 220 if port <= 0:
350 221 port = socket.bind_to_random_port(connection)
351 222 else:
352 223 connection += ':%i' % port
353 224 socket.bind(connection)
354 225 return port
355 226
356 227
357 228 def main():
358 229 """ Main entry point for launching a kernel.
359 230 """
360 231 # Parse command line arguments.
361 232 parser = ArgumentParser()
362 233 parser.add_argument('--ip', type=str, default='127.0.0.1',
363 234 help='set the kernel\'s IP address [default: local]')
364 235 parser.add_argument('--xrep', type=int, metavar='PORT', default=0,
365 236 help='set the XREP channel port [default: random]')
366 237 parser.add_argument('--pub', type=int, metavar='PORT', default=0,
367 238 help='set the PUB channel port [default: random]')
368 239 parser.add_argument('--req', type=int, metavar='PORT', default=0,
369 240 help='set the REQ channel port [default: random]')
370 241 if sys.platform == 'win32':
371 242 parser.add_argument('--parent', type=int, metavar='HANDLE',
372 243 default=0, help='kill this process if the process '
373 244 'with HANDLE dies')
374 245 else:
375 246 parser.add_argument('--parent', action='store_true',
376 247 help='kill this process if its parent dies')
377 248 namespace = parser.parse_args()
378 249
379 250 # Create a context, a session, and the kernel sockets.
380 251 print >>sys.__stdout__, "Starting the kernel..."
381 252 context = zmq.Context()
382 253 session = Session(username=u'kernel')
383 254
384 255 reply_socket = context.socket(zmq.XREP)
385 256 xrep_port = bind_port(reply_socket, namespace.ip, namespace.xrep)
386 257 print >>sys.__stdout__, "XREP Channel on port", xrep_port
387 258
388 259 pub_socket = context.socket(zmq.PUB)
389 260 pub_port = bind_port(pub_socket, namespace.ip, namespace.pub)
390 261 print >>sys.__stdout__, "PUB Channel on port", pub_port
391 262
392 263 req_socket = context.socket(zmq.XREQ)
393 264 req_port = bind_port(req_socket, namespace.ip, namespace.req)
394 265 print >>sys.__stdout__, "REQ Channel on port", req_port
395 266
396 267 # Redirect input streams and set a display hook.
397 268 sys.stdout = OutStream(session, pub_socket, u'stdout')
398 269 sys.stderr = OutStream(session, pub_socket, u'stderr')
399 270 sys.displayhook = DisplayHook(session, pub_socket)
400 271
401 272 # Create the kernel.
402 273 kernel = Kernel(session, reply_socket, pub_socket, req_socket)
403 274
404 275 # Configure this kernel/process to die on parent termination, if necessary.
405 276 if namespace.parent:
406 277 if sys.platform == 'win32':
407 278 poller = ExitPollerWindows(namespace.parent)
408 279 else:
409 280 poller = ExitPollerUnix()
410 281 poller.start()
411 282
412 283 # Start the kernel mainloop.
413 284 kernel.start()
414 285
415 286
416 287 def launch_kernel(xrep_port=0, pub_port=0, req_port=0, independent=False):
417 288 """ Launches a localhost kernel, binding to the specified ports.
418 289
419 290 Parameters
420 291 ----------
421 292 xrep_port : int, optional
422 293 The port to use for XREP channel.
423 294
424 295 pub_port : int, optional
425 296 The port to use for the SUB channel.
426 297
427 298 req_port : int, optional
428 299 The port to use for the REQ (raw input) channel.
429 300
430 301 independent : bool, optional (default False)
431 302 If set, the kernel process is guaranteed to survive if this process
432 303 dies. If not set, an effort is made to ensure that the kernel is killed
433 304 when this process dies. Note that in this case it is still good practice
434 305 to kill kernels manually before exiting.
435 306
436 307 Returns
437 308 -------
438 309 A tuple of form:
439 310 (kernel_process, xrep_port, pub_port, req_port)
440 311 where kernel_process is a Popen object and the ports are integers.
441 312 """
442 313 import socket
443 314 from subprocess import Popen
444 315
445 316 # Find open ports as necessary.
446 317 ports = []
447 318 ports_needed = int(xrep_port <= 0) + int(pub_port <= 0) + int(req_port <= 0)
448 319 for i in xrange(ports_needed):
449 320 sock = socket.socket()
450 321 sock.bind(('', 0))
451 322 ports.append(sock)
452 323 for i, sock in enumerate(ports):
453 324 port = sock.getsockname()[1]
454 325 sock.close()
455 326 ports[i] = port
456 327 if xrep_port <= 0:
457 328 xrep_port = ports.pop(0)
458 329 if pub_port <= 0:
459 330 pub_port = ports.pop(0)
460 331 if req_port <= 0:
461 332 req_port = ports.pop(0)
462 333
463 334 # Spawn a kernel.
464 335 command = 'from IPython.zmq.kernel import main; main()'
465 336 arguments = [ sys.executable, '-c', command, '--xrep', str(xrep_port),
466 337 '--pub', str(pub_port), '--req', str(req_port) ]
467 338 if independent:
468 339 if sys.platform == 'win32':
469 340 proc = Popen(['start', '/b'] + arguments, shell=True)
470 341 else:
471 342 proc = Popen(arguments, preexec_fn=lambda: os.setsid())
472 343 else:
473 344 if sys.platform == 'win32':
474 345 from _subprocess import DuplicateHandle, GetCurrentProcess, \
475 346 DUPLICATE_SAME_ACCESS
476 347 pid = GetCurrentProcess()
477 348 handle = DuplicateHandle(pid, pid, pid, 0,
478 349 True, # Inheritable by new processes.
479 350 DUPLICATE_SAME_ACCESS)
480 351 proc = Popen(arguments + ['--parent', str(int(handle))])
481 352 else:
482 353 proc = Popen(arguments + ['--parent'])
483 354
484 355 return proc, xrep_port, pub_port, req_port
485 356
486 357
487 358 if __name__ == '__main__':
488 359 main()
General Comments 0
You need to be logged in to leave comments. Login now