##// END OF EJS Templates
Separating kernel into smaller pieces.
Brian Granger -
Show More
@@ -0,0 +1,22 b''
1 import __builtin__
2
3 from session import extract_header
4
5 class DisplayHook(object):
6
7 def __init__(self, session, pub_socket):
8 self.session = session
9 self.pub_socket = pub_socket
10 self.parent_header = {}
11
12 def __call__(self, obj):
13 if obj is None:
14 return
15
16 __builtin__._ = obj
17 msg = self.session.msg(u'pyout', {u'data':repr(obj)},
18 parent=self.parent_header)
19 self.pub_socket.send_json(msg)
20
21 def set_parent(self, parent):
22 self.parent_header = extract_header(parent) No newline at end of file
@@ -0,0 +1,42 b''
1 import os
2 import time
3 from threading import Thread
4
5
6 class ExitPollerUnix(Thread):
7 """ A Unix-specific daemon thread that terminates the program immediately
8 when the parent process no longer exists.
9 """
10
11 def __init__(self):
12 super(ExitPollerUnix, self).__init__()
13 self.daemon = True
14
15 def run(self):
16 # We cannot use os.waitpid because it works only for child processes.
17 from errno import EINTR
18 while True:
19 try:
20 if os.getppid() == 1:
21 os._exit(1)
22 time.sleep(1.0)
23 except OSError, e:
24 if e.errno == EINTR:
25 continue
26 raise
27
28 class ExitPollerWindows(Thread):
29 """ A Windows-specific daemon thread that terminates the program immediately
30 when a Win32 handle is signaled.
31 """
32
33 def __init__(self, handle):
34 super(ExitPollerWindows, self).__init__()
35 self.daemon = True
36 self.handle = handle
37
38 def run(self):
39 from _subprocess import WaitForSingleObject, WAIT_OBJECT_0, INFINITE
40 result = WaitForSingleObject(self.handle, INFINITE)
41 if result == WAIT_OBJECT_0:
42 os._exit(1) No newline at end of file
@@ -0,0 +1,77 b''
1 import sys
2 import time
3 from cStringIO import StringIO
4
5 from session import extract_header, Message
6
7 #-----------------------------------------------------------------------------
8 # Stream classes
9 #-----------------------------------------------------------------------------
10
11 class OutStream(object):
12 """A file like object that publishes the stream to a 0MQ PUB socket."""
13
14 # The time interval between automatic flushes, in seconds.
15 flush_interval = 0.05
16
17 def __init__(self, session, pub_socket, name):
18 self.session = session
19 self.pub_socket = pub_socket
20 self.name = name
21 self.parent_header = {}
22 self._new_buffer()
23
24 def set_parent(self, parent):
25 self.parent_header = extract_header(parent)
26
27 def close(self):
28 self.pub_socket = None
29
30 def flush(self):
31 if self.pub_socket is None:
32 raise ValueError(u'I/O operation on closed file')
33 else:
34 data = self._buffer.getvalue()
35 if data:
36 content = {u'name':self.name, u'data':data}
37 msg = self.session.msg(u'stream', content=content,
38 parent=self.parent_header)
39 print>>sys.__stdout__, Message(msg)
40 self.pub_socket.send_json(msg)
41
42 self._buffer.close()
43 self._new_buffer()
44
45 def isatty(self):
46 return False
47
48 def next(self):
49 raise IOError('Read not supported on a write only stream.')
50
51 def read(self, size=-1):
52 raise IOError('Read not supported on a write only stream.')
53
54 def readline(self, size=-1):
55 raise IOError('Read not supported on a write only stream.')
56
57 def write(self, string):
58 if self.pub_socket is None:
59 raise ValueError('I/O operation on closed file')
60 else:
61 self._buffer.write(string)
62 current_time = time.time()
63 if self._start <= 0:
64 self._start = current_time
65 elif current_time - self._start > self.flush_interval:
66 self.flush()
67
68 def writelines(self, sequence):
69 if self.pub_socket is None:
70 raise ValueError('I/O operation on closed file')
71 else:
72 for string in sequence:
73 self.write(string)
74
75 def _new_buffer(self):
76 self._buffer = StringIO()
77 self._start = -1 No newline at end of file
@@ -3,7 +3,6 b''
3
3
4 Things to do:
4 Things to do:
5
5
6 * Finish implementing `raw_input`.
7 * Implement `set_parent` logic. Right before doing exec, the Kernel should
6 * Implement `set_parent` logic. Right before doing exec, the Kernel should
8 call set_parent on all the PUB objects with the message about to be executed.
7 call set_parent on all the PUB objects with the message about to be executed.
9 * Implement random port and security key logic.
8 * Implement random port and security key logic.
@@ -18,10 +17,8 b' Things to do:'
18 # Standard library imports.
17 # Standard library imports.
19 import __builtin__
18 import __builtin__
20 from code import CommandCompiler
19 from code import CommandCompiler
21 from cStringIO import StringIO
22 import os
20 import os
23 import sys
21 import sys
24 from threading import Thread
25 import time
22 import time
26 import traceback
23 import traceback
27
24
@@ -30,102 +27,16 b' import zmq'
30
27
31 # Local imports.
28 # Local imports.
32 from IPython.external.argparse import ArgumentParser
29 from IPython.external.argparse import ArgumentParser
33 from session import Session, Message, extract_header
30 from session import Session, Message
34 from completer import KernelCompleter
31 from completer import KernelCompleter
32 from .iostream import OutStream
33 from .displayhook import DisplayHook
34 from .exitpoller import ExitPollerUnix, ExitPollerWindows
35
35
36 #-----------------------------------------------------------------------------
36 #-----------------------------------------------------------------------------
37 # Kernel and stream classes
37 # Main kernel class
38 #-----------------------------------------------------------------------------
38 #-----------------------------------------------------------------------------
39
39
40 class OutStream(object):
41 """A file like object that publishes the stream to a 0MQ PUB socket."""
42
43 # The time interval between automatic flushes, in seconds.
44 flush_interval = 0.05
45
46 def __init__(self, session, pub_socket, name):
47 self.session = session
48 self.pub_socket = pub_socket
49 self.name = name
50 self.parent_header = {}
51 self._new_buffer()
52
53 def set_parent(self, parent):
54 self.parent_header = extract_header(parent)
55
56 def close(self):
57 self.pub_socket = None
58
59 def flush(self):
60 if self.pub_socket is None:
61 raise ValueError(u'I/O operation on closed file')
62 else:
63 data = self._buffer.getvalue()
64 if data:
65 content = {u'name':self.name, u'data':data}
66 msg = self.session.msg(u'stream', content=content,
67 parent=self.parent_header)
68 print>>sys.__stdout__, Message(msg)
69 self.pub_socket.send_json(msg)
70
71 self._buffer.close()
72 self._new_buffer()
73
74 def isatty(self):
75 return False
76
77 def next(self):
78 raise IOError('Read not supported on a write only stream.')
79
80 def read(self, size=-1):
81 raise IOError('Read not supported on a write only stream.')
82
83 def readline(self, size=-1):
84 raise IOError('Read not supported on a write only stream.')
85
86 def write(self, string):
87 if self.pub_socket is None:
88 raise ValueError('I/O operation on closed file')
89 else:
90 self._buffer.write(string)
91 current_time = time.time()
92 if self._start <= 0:
93 self._start = current_time
94 elif current_time - self._start > self.flush_interval:
95 self.flush()
96
97 def writelines(self, sequence):
98 if self.pub_socket is None:
99 raise ValueError('I/O operation on closed file')
100 else:
101 for string in sequence:
102 self.write(string)
103
104 def _new_buffer(self):
105 self._buffer = StringIO()
106 self._start = -1
107
108
109 class DisplayHook(object):
110
111 def __init__(self, session, pub_socket):
112 self.session = session
113 self.pub_socket = pub_socket
114 self.parent_header = {}
115
116 def __call__(self, obj):
117 if obj is None:
118 return
119
120 __builtin__._ = obj
121 msg = self.session.msg(u'pyout', {u'data':repr(obj)},
122 parent=self.parent_header)
123 self.pub_socket.send_json(msg)
124
125 def set_parent(self, parent):
126 self.parent_header = extract_header(parent)
127
128
129 class Kernel(object):
40 class Kernel(object):
130
41
131 def __init__(self, session, reply_socket, pub_socket, req_socket):
42 def __init__(self, session, reply_socket, pub_socket, req_socket):
@@ -190,7 +101,6 b' class Kernel(object):'
190
101
191 exec comp_code in self.user_ns, self.user_ns
102 exec comp_code in self.user_ns, self.user_ns
192 except:
103 except:
193 result = u'error'
194 etype, evalue, tb = sys.exc_info()
104 etype, evalue, tb = sys.exc_info()
195 tb = traceback.format_exception(etype, evalue, tb)
105 tb = traceback.format_exception(etype, evalue, tb)
196 exc_content = {
106 exc_content = {
@@ -302,45 +212,6 b' class Kernel(object):'
302 # Kernel main and launch functions
212 # Kernel main and launch functions
303 #-----------------------------------------------------------------------------
213 #-----------------------------------------------------------------------------
304
214
305 class ExitPollerUnix(Thread):
306 """ A Unix-specific daemon thread that terminates the program immediately
307 when the parent process no longer exists.
308 """
309
310 def __init__(self):
311 super(ExitPollerUnix, self).__init__()
312 self.daemon = True
313
314 def run(self):
315 # We cannot use os.waitpid because it works only for child processes.
316 from errno import EINTR
317 while True:
318 try:
319 if os.getppid() == 1:
320 os._exit(1)
321 time.sleep(1.0)
322 except OSError, e:
323 if e.errno == EINTR:
324 continue
325 raise
326
327 class ExitPollerWindows(Thread):
328 """ A Windows-specific daemon thread that terminates the program immediately
329 when a Win32 handle is signaled.
330 """
331
332 def __init__(self, handle):
333 super(ExitPollerWindows, self).__init__()
334 self.daemon = True
335 self.handle = handle
336
337 def run(self):
338 from _subprocess import WaitForSingleObject, WAIT_OBJECT_0, INFINITE
339 result = WaitForSingleObject(self.handle, INFINITE)
340 if result == WAIT_OBJECT_0:
341 os._exit(1)
342
343
344 def bind_port(socket, ip, port):
215 def bind_port(socket, ip, port):
345 """ Binds the specified ZMQ socket. If the port is less than zero, a random
216 """ Binds the specified ZMQ socket. If the port is less than zero, a random
346 port is chosen. Returns the port that was bound.
217 port is chosen. Returns the port that was bound.
General Comments 0
You need to be logged in to leave comments. Login now