##// END OF EJS Templates
Work on the kernel manager.
Brian Granger -
Show More
@@ -0,0 +1,271 b''
1 """Kernel frontend classes.
2
3 To do:
4
5 1. Create custom channel subclasses for Qt.
6 2. Create logger to handle debugging and console messages.
7
8 """
9
10 from Queue import Queue, Empty
11 from threading import Thread
12 import traceback
13
14 import zmq
15 from zmq import POLLIN, POLLOUT, POLLERR
16 from zmq.eventloop import ioloop
17 from session import Session
18
19
20 class MissingHandlerError(Exception):
21 pass
22
23
24 class KernelManager(object):
25
26 def __init__(self, xreq_addr, sub_addr, rep_addr,
27 context=None, session=None):
28 self.context = zmq.Context() if context is None else context
29 self.session = Session() if session is None else session
30 self.xreq_addr = xreq_addr
31 self.sub_addr = sub_addr
32 self.rep_addr = rep_addr
33
34 def start_kernel(self):
35 """Start a localhost kernel on ip and port.
36
37 The SUB channel is for the frontend to receive messages published by
38 the kernel.
39
40 The REQ channel is for the frontend to make requests of the kernel.
41
42 The REP channel is for the kernel to request stdin (raw_input) from
43 the frontend.
44 """
45
46 def kill_kernel(self):
47 """Kill the running kernel"""
48
49 def is_alive(self):
50 """Is the kernel alive?"""
51 return True
52
53 def signal_kernel(self, signum):
54 """Send signum to the kernel."""
55
56 def get_sub_channel(self):
57 """Get the SUB socket channel object."""
58 return SubSocketChannel(self.context, self.session, self.sub_addr)
59
60 def get_xreq_channel(self):
61 """Get the REQ socket channel object to make requests of the kernel."""
62 return XReqSocketChannel(self.context, self.session, self.xreq_addr)
63
64 def get_rep_channel(self):
65 """Get the REP socket channel object to handle stdin (raw_input)."""
66 return RepSocketChannel(self.context, self.session, self.rep_addr)
67
68
69 class ZmqSocketChannel(Thread):
70
71 socket = None
72
73 def __init__(self, context, session, addr):
74 self.context = context
75 self.session = session
76 self.addr = addr
77 super(ZmqSocketChannel, self).__init__()
78 self.daemon = True
79
80
81 class SubSocketChannel(ZmqSocketChannel):
82
83 handlers = None
84 _overriden_call_handler = None
85
86 def __init__(self, context, session, addr):
87 self.handlers = {}
88 super(SubSocketChannel, self).__init__(context, session, addr)
89
90 def run(self):
91 self.socket = self.context.socket(zmq.SUB)
92 self.socket.setsockopt(zmq.SUBSCRIBE,'')
93 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
94 self.socket.connect('tcp://%s:%i' % self.addr)
95 self.ioloop = ioloop.IOLoop()
96 self.ioloop.add_handler(self.socket, self._handle_events,
97 POLLIN|POLLERR)
98 self.ioloop.start()
99
100 def _handle_events(self, socket, events):
101 # Turn on and off POLLOUT depending on if we have made a request
102 if events & POLLERR:
103 self._handle_err()
104 if events & POLLIN:
105 self._handle_recv()
106
107 def _handle_err(self):
108 raise zmq.ZmqError()
109
110 def _handle_recv(self):
111 msg = self.socket.recv_json()
112 self.call_handlers(msg)
113
114 def override_call_handler(self, func):
115 """Permanently override the call_handler.
116
117 The function func will be called as::
118
119 func(handler, msg)
120
121 And must call::
122
123 handler(msg)
124
125 in the main thread.
126 """
127 assert callable(func), "not a callable: %r" % func
128 self._overriden_call_handler = func
129
130 def call_handlers(self, msg):
131 handler = self.handlers.get(msg['msg_type'], None)
132 if handler is not None:
133 try:
134 self.call_handler(handler, msg)
135 except:
136 # XXX: This should be logged at least
137 traceback.print_last()
138
139 def call_handler(self, handler, msg):
140 if self._overriden_call_handler is not None:
141 self._overriden_call_handler(handler, msg)
142 elif hasattr(self, '_call_handler'):
143 call_handler = getattr(self, '_call_handler')
144 call_handler(handler, msg)
145 else:
146 raise RuntimeError('no handler!')
147
148 def add_handler(self, callback, msg_type):
149 """Register a callback for msg type."""
150 self.handlers[msg_type] = callback
151
152 def remove_handler(self, msg_type):
153 self.handlers.pop(msg_type, None)
154
155
156 class XReqSocketChannel(ZmqSocketChannel):
157
158 handler_queue = None
159 command_queue = None
160 handlers = None
161 _overriden_call_handler = None
162
163 def __init__(self, context, session, addr):
164 self.handlers = {}
165 self.handler_queue = Queue()
166 self.command_queue = Queue()
167 super(XReqSocketChannel, self).__init__(context, session, addr)
168
169 def run(self):
170 self.socket = self.context.socket(zmq.XREQ)
171 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
172 self.socket.connect('tcp://%s:%i' % self.addr)
173 self.ioloop = ioloop.IOLoop()
174 self.ioloop.add_handler(self.socket, self._handle_events,
175 POLLIN|POLLOUT|POLLERR)
176 self.ioloop.start()
177
178 def _handle_events(self, socket, events):
179 # Turn on and off POLLOUT depending on if we have made a request
180 if events & POLLERR:
181 self._handle_err()
182 if events & POLLOUT:
183 self._handle_send()
184 if events & POLLIN:
185 self._handle_recv()
186
187 def _handle_recv(self):
188 msg = self.socket.recv_json()
189 print "Got reply:", msg
190 try:
191 handler = self.handler_queue.get(False)
192 except Empty:
193 print "Message received with no handler!!!"
194 print msg
195 else:
196 self.call_handler(handler, msg)
197
198 def _handle_send(self):
199 try:
200 msg = self.command_queue.get(False)
201 except Empty:
202 pass
203 else:
204 self.socket.send_json(msg)
205
206 def _handle_err(self):
207 raise zmq.ZmqError()
208
209 def _queue_request(self, msg, callback):
210 handler = self._find_handler(msg['msg_type'], callback)
211 self.handler_queue.put(handler)
212 self.command_queue.put(msg)
213
214 def execute(self, code, callback=None):
215 # Create class for content/msg creation. Related to, but possibly
216 # not in Session.
217 content = dict(code=code)
218 msg = self.session.msg('execute_request', content)
219 self._queue_request(msg, callback)
220 return msg['header']['msg_id']
221
222 def complete(self, text, line, block=None, callback=None):
223 content = dict(text=text, line=line)
224 msg = self.session.msg('complete_request', content)
225 return self._queue_request(msg, callback)
226 return msg['header']['msg_id']
227
228 def object_info(self, oname, callback=None):
229 content = dict(oname=oname)
230 msg = self.session.msg('object_info_request', content)
231 return self._queue_request(msg, callback)
232 return msg['header']['msg_id']
233
234 def _find_handler(self, name, callback):
235 if callback is not None:
236 return callback
237 handler = self.handlers.get(name)
238 if handler is None:
239 raise MissingHandlerError('No handler defined for method: %s' % name)
240 return handler
241
242 def override_call_handler(self, func):
243 """Permanently override the call_handler.
244
245 The function func will be called as::
246
247 func(handler, msg)
248
249 And must call::
250
251 handler(msg)
252
253 in the main thread.
254 """
255 assert callable(func), "not a callable: %r" % func
256 self._overriden_call_handler = func
257
258 def call_handler(self, handler, msg):
259 if self._overriden_call_handler is not None:
260 self._overriden_call_handler(handler, msg)
261 elif hasattr(self, '_call_handler'):
262 call_handler = getattr(self, '_call_handler')
263 call_handler(handler, msg)
264 else:
265 raise RuntimeError('no handler!')
266
267
268 class RepSocketChannel(ZmqSocketChannel):
269
270 def on_raw_input():
271 pass
@@ -0,0 +1,52 b''
1 from Queue import Queue, Empty
2 import time
3
4 from kernelmanager import KernelManager
5
6 xreq_addr = ('127.0.0.1',5575)
7 sub_addr = ('127.0.0.1', 5576)
8 rep_addr = ('127.0.0.1', 5577)
9
10
11 km = KernelManager(xreq_addr, sub_addr, rep_addr)
12 # xreq_channel = km.get_xreq_channel()
13 sub_channel = km.get_sub_channel()
14
15 # xreq_channel.start()
16 sub_channel.start()
17
18 print "Channels are started"
19
20 def printer(msg):
21 print
22 print msg
23
24 class CallHandler(object):
25
26 def __init__(self):
27 self.queue = Queue()
28
29 def __call__(self, handler, msg):
30 self.queue.put((handler, msg))
31
32 def handle(self):
33 try:
34 handler, msg = self.queue.get(block=False)
35 except Empty:
36 pass
37 else:
38 handler(msg)
39
40 call_handler = CallHandler()
41 sub_channel.override_call_handler(call_handler)
42 sub_channel.add_handler(printer, 'pyin')
43 sub_channel.add_handler(printer, 'pyout')
44 sub_channel.add_handler(printer, 'stdout')
45 sub_channel.add_handler(printer, 'stderr')
46
47 for i in range(100):
48 call_handler.handle()
49 time.sleep(1)
50
51 # xreq_channel.join()
52 sub_channel.join() No newline at end of file
@@ -1,270 +1,272 b''
1 1 #!/usr/bin/env python
2 2 """A simple interactive kernel that talks to a frontend over 0MQ.
3 3
4 4 Things to do:
5 5
6 6 * Finish implementing `raw_input`.
7 7 * Implement `set_parent` logic. Right before doing exec, the Kernel should
8 8 call set_parent on all the PUB objects with the message about to be executed.
9 9 * Implement random port and security key logic.
10 10 * Implement control messages.
11 11 * Implement event loop and poll version.
12 12 """
13 13
14 14 import __builtin__
15 15 import sys
16 16 import time
17 17 import traceback
18 18
19 19 from code import CommandCompiler
20 20
21 21 import zmq
22 22
23 23 from session import Session, Message, extract_header
24 24 from completer import KernelCompleter
25 25
26 26 class OutStream(object):
27 27 """A file like object that publishes the stream to a 0MQ PUB socket."""
28 28
29 29 def __init__(self, session, pub_socket, name, max_buffer=200):
30 30 self.session = session
31 31 self.pub_socket = pub_socket
32 32 self.name = name
33 33 self._buffer = []
34 34 self._buffer_len = 0
35 35 self.max_buffer = max_buffer
36 36 self.parent_header = {}
37 37
38 38 def set_parent(self, parent):
39 39 self.parent_header = extract_header(parent)
40 40
41 41 def close(self):
42 42 self.pub_socket = None
43 43
44 44 def flush(self):
45 45 if self.pub_socket is None:
46 46 raise ValueError(u'I/O operation on closed file')
47 47 else:
48 48 if self._buffer:
49 49 data = ''.join(self._buffer)
50 50 content = {u'name':self.name, u'data':data}
51 51 msg = self.session.msg(u'stream', content=content,
52 52 parent=self.parent_header)
53 53 print>>sys.__stdout__, Message(msg)
54 54 self.pub_socket.send_json(msg)
55 55 self._buffer_len = 0
56 56 self._buffer = []
57 57
58 58 def isattr(self):
59 59 return False
60 60
61 61 def next(self):
62 62 raise IOError('Read not supported on a write only stream.')
63 63
64 64 def read(self, size=None):
65 65 raise IOError('Read not supported on a write only stream.')
66 66
67 67 readline=read
68 68
69 69 def write(self, s):
70 70 if self.pub_socket is None:
71 71 raise ValueError('I/O operation on closed file')
72 72 else:
73 73 self._buffer.append(s)
74 74 self._buffer_len += len(s)
75 75 self._maybe_send()
76 76
77 77 def _maybe_send(self):
78 78 if '\n' in self._buffer[-1]:
79 79 self.flush()
80 80 if self._buffer_len > self.max_buffer:
81 81 self.flush()
82 82
83 83 def writelines(self, sequence):
84 84 if self.pub_socket is None:
85 85 raise ValueError('I/O operation on closed file')
86 86 else:
87 87 for s in sequence:
88 88 self.write(s)
89 89
90 90
91 91 class DisplayHook(object):
92 92
93 93 def __init__(self, session, pub_socket):
94 94 self.session = session
95 95 self.pub_socket = pub_socket
96 96 self.parent_header = {}
97 97
98 98 def __call__(self, obj):
99 99 if obj is None:
100 100 return
101 101
102 102 __builtin__._ = obj
103 103 msg = self.session.msg(u'pyout', {u'data':repr(obj)},
104 104 parent=self.parent_header)
105 105 self.pub_socket.send_json(msg)
106 106
107 107 def set_parent(self, parent):
108 108 self.parent_header = extract_header(parent)
109 109
110 110
111 111 class RawInput(object):
112 112
113 113 def __init__(self, session, socket):
114 114 self.session = session
115 115 self.socket = socket
116 116
117 117 def __call__(self, prompt=None):
118 118 msg = self.session.msg(u'raw_input')
119 119 self.socket.send_json(msg)
120 120 while True:
121 121 try:
122 122 reply = self.socket.recv_json(zmq.NOBLOCK)
123 123 except zmq.ZMQError, e:
124 124 if e.errno == zmq.EAGAIN:
125 125 pass
126 126 else:
127 127 raise
128 128 else:
129 129 break
130 130 return reply[u'content'][u'data']
131 131
132 132
133 133 class Kernel(object):
134 134
135 135 def __init__(self, session, reply_socket, pub_socket):
136 136 self.session = session
137 137 self.reply_socket = reply_socket
138 138 self.pub_socket = pub_socket
139 139 self.user_ns = {}
140 140 self.history = []
141 141 self.compiler = CommandCompiler()
142 142 self.completer = KernelCompleter(self.user_ns)
143 143
144 144 # Build dict of handlers for message types
145 145 self.handlers = {}
146 146 for msg_type in ['execute_request', 'complete_request']:
147 147 self.handlers[msg_type] = getattr(self, msg_type)
148 148
149 149 def abort_queue(self):
150 150 while True:
151 151 try:
152 152 ident = self.reply_socket.recv(zmq.NOBLOCK)
153 153 except zmq.ZMQError, e:
154 154 if e.errno == zmq.EAGAIN:
155 155 break
156 156 else:
157 157 assert self.reply_socket.rcvmore(), "Unexpected missing message part."
158 158 msg = self.reply_socket.recv_json()
159 159 print>>sys.__stdout__, "Aborting:"
160 160 print>>sys.__stdout__, Message(msg)
161 161 msg_type = msg['msg_type']
162 162 reply_type = msg_type.split('_')[0] + '_reply'
163 163 reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg)
164 164 print>>sys.__stdout__, Message(reply_msg)
165 165 self.reply_socket.send(ident,zmq.SNDMORE)
166 166 self.reply_socket.send_json(reply_msg)
167 167 # We need to wait a bit for requests to come in. This can probably
168 168 # be set shorter for true asynchronous clients.
169 169 time.sleep(0.1)
170 170
171 171 def execute_request(self, ident, parent):
172 172 try:
173 173 code = parent[u'content'][u'code']
174 174 except:
175 175 print>>sys.__stderr__, "Got bad msg: "
176 176 print>>sys.__stderr__, Message(parent)
177 177 return
178 178 pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
179 179 self.pub_socket.send_json(pyin_msg)
180 180 try:
181 181 comp_code = self.compiler(code, '<zmq-kernel>')
182 182 sys.displayhook.set_parent(parent)
183 183 exec comp_code in self.user_ns, self.user_ns
184 184 except:
185 185 result = u'error'
186 186 etype, evalue, tb = sys.exc_info()
187 187 tb = traceback.format_exception(etype, evalue, tb)
188 188 exc_content = {
189 189 u'status' : u'error',
190 190 u'traceback' : tb,
191 191 u'etype' : unicode(etype),
192 192 u'evalue' : unicode(evalue)
193 193 }
194 194 exc_msg = self.session.msg(u'pyerr', exc_content, parent)
195 195 self.pub_socket.send_json(exc_msg)
196 196 reply_content = exc_content
197 197 else:
198 198 reply_content = {'status' : 'ok'}
199 199 reply_msg = self.session.msg(u'execute_reply', reply_content, parent)
200 200 print>>sys.__stdout__, Message(reply_msg)
201 201 self.reply_socket.send(ident, zmq.SNDMORE)
202 202 self.reply_socket.send_json(reply_msg)
203 203 if reply_msg['content']['status'] == u'error':
204 204 self.abort_queue()
205 205
206 206 def complete_request(self, ident, parent):
207 207 matches = {'matches' : self.complete(parent),
208 208 'status' : 'ok'}
209 209 completion_msg = self.session.send(self.reply_socket, 'complete_reply',
210 210 matches, parent, ident)
211 211 print >> sys.__stdout__, completion_msg
212 212
213 213 def complete(self, msg):
214 214 return self.completer.complete(msg.content.line, msg.content.text)
215 215
216 216 def start(self):
217 217 while True:
218 218 ident = self.reply_socket.recv()
219 219 assert self.reply_socket.rcvmore(), "Unexpected missing message part."
220 220 msg = self.reply_socket.recv_json()
221 221 omsg = Message(msg)
222 print>>sys.__stdout__
222 223 print>>sys.__stdout__, omsg
223 224 handler = self.handlers.get(omsg.msg_type, None)
224 225 if handler is None:
225 226 print >> sys.__stderr__, "UNKNOWN MESSAGE TYPE:", omsg
226 227 else:
227 228 handler(ident, omsg)
228 229
229 230
230 231 def main():
231 232 c = zmq.Context()
232 233
233 234 ip = '127.0.0.1'
234 235 port_base = 5575
235 236 connection = ('tcp://%s' % ip) + ':%i'
236 237 rep_conn = connection % port_base
237 238 pub_conn = connection % (port_base+1)
238 239
239 240 print >>sys.__stdout__, "Starting the kernel..."
240 print >>sys.__stdout__, "On:",rep_conn, pub_conn
241 print >>sys.__stdout__, "XREP Channel:", rep_conn
242 print >>sys.__stdout__, "PUB Channel:", pub_conn
241 243
242 244 session = Session(username=u'kernel')
243 245
244 246 reply_socket = c.socket(zmq.XREP)
245 247 reply_socket.bind(rep_conn)
246 248
247 249 pub_socket = c.socket(zmq.PUB)
248 250 pub_socket.bind(pub_conn)
249 251
250 252 stdout = OutStream(session, pub_socket, u'stdout')
251 253 stderr = OutStream(session, pub_socket, u'stderr')
252 254 sys.stdout = stdout
253 255 sys.stderr = stderr
254 256
255 257 display_hook = DisplayHook(session, pub_socket)
256 258 sys.displayhook = display_hook
257 259
258 260 kernel = Kernel(session, reply_socket, pub_socket)
259 261
260 262 # For debugging convenience, put sleep and a string in the namespace, so we
261 263 # have them every time we start.
262 264 kernel.user_ns['sleep'] = time.sleep
263 265 kernel.user_ns['s'] = 'Test string'
264 266
265 267 print >>sys.__stdout__, "Use Ctrl-\\ (NOT Ctrl-C!) to terminate."
266 268 kernel.start()
267 269
268 270
269 271 if __name__ == '__main__':
270 272 main()
@@ -1,119 +1,122 b''
1 1 import os
2 2 import uuid
3 3 import pprint
4 4
5 5 import zmq
6 6
7 7 class Message(object):
8 8 """A simple message object that maps dict keys to attributes.
9 9
10 10 A Message can be created from a dict and a dict from a Message instance
11 11 simply by calling dict(msg_obj)."""
12 12
13 13 def __init__(self, msg_dict):
14 14 dct = self.__dict__
15 15 for k, v in msg_dict.iteritems():
16 16 if isinstance(v, dict):
17 17 v = Message(v)
18 18 dct[k] = v
19 19
20 20 # Having this iterator lets dict(msg_obj) work out of the box.
21 21 def __iter__(self):
22 22 return iter(self.__dict__.iteritems())
23 23
24 24 def __repr__(self):
25 25 return repr(self.__dict__)
26 26
27 27 def __str__(self):
28 28 return pprint.pformat(self.__dict__)
29 29
30 30 def __contains__(self, k):
31 31 return k in self.__dict__
32 32
33 33 def __getitem__(self, k):
34 34 return self.__dict__[k]
35 35
36 36
37 37 def msg_header(msg_id, username, session):
38 38 return {
39 39 'msg_id' : msg_id,
40 40 'username' : username,
41 41 'session' : session
42 42 }
43 43
44 44
45 45 def extract_header(msg_or_header):
46 46 """Given a message or header, return the header."""
47 47 if not msg_or_header:
48 48 return {}
49 49 try:
50 50 # See if msg_or_header is the entire message.
51 51 h = msg_or_header['header']
52 52 except KeyError:
53 53 try:
54 54 # See if msg_or_header is just the header
55 55 h = msg_or_header['msg_id']
56 56 except KeyError:
57 57 raise
58 58 else:
59 59 h = msg_or_header
60 60 if not isinstance(h, dict):
61 61 h = dict(h)
62 62 return h
63 63
64 64
65 65 class Session(object):
66 66
67 def __init__(self, username=os.environ.get('USER','username')):
67 def __init__(self, username=os.environ.get('USER','username'), session=None):
68 68 self.username = username
69 if session is None:
69 70 self.session = str(uuid.uuid4())
71 else:
72 self.session = session
70 73 self.msg_id = 0
71 74
72 75 def msg_header(self):
73 76 h = msg_header(self.msg_id, self.username, self.session)
74 77 self.msg_id += 1
75 78 return h
76 79
77 80 def msg(self, msg_type, content=None, parent=None):
78 81 msg = {}
79 82 msg['header'] = self.msg_header()
80 83 msg['parent_header'] = {} if parent is None else extract_header(parent)
81 84 msg['msg_type'] = msg_type
82 85 msg['content'] = {} if content is None else content
83 86 return msg
84 87
85 88 def send(self, socket, msg_type, content=None, parent=None, ident=None):
86 89 msg = self.msg(msg_type, content, parent)
87 90 if ident is not None:
88 91 socket.send(ident, zmq.SNDMORE)
89 92 socket.send_json(msg)
90 93 omsg = Message(msg)
91 94 return omsg
92 95
93 96 def recv(self, socket, mode=zmq.NOBLOCK):
94 97 try:
95 98 msg = socket.recv_json(mode)
96 99 except zmq.ZMQError, e:
97 100 if e.errno == zmq.EAGAIN:
98 101 # We can convert EAGAIN to None as we know in this case
99 102 # recv_json won't return None.
100 103 return None
101 104 else:
102 105 raise
103 106 return Message(msg)
104 107
105 108 def test_msg2obj():
106 109 am = dict(x=1)
107 110 ao = Message(am)
108 111 assert ao.x == am['x']
109 112
110 113 am['y'] = dict(z=1)
111 114 ao = Message(am)
112 115 assert ao.y.z == am['y']['z']
113 116
114 117 k1, k2 = 'y', 'z'
115 118 assert ao[k1][k2] == am[k1][k2]
116 119
117 120 am2 = dict(ao)
118 121 assert am['x'] == am2['x']
119 122 assert am['y']['z'] == am2['y']['z']
General Comments 0
You need to be logged in to leave comments. Login now