##// END OF EJS Templates
Work on the kernel manager.
Brian Granger -
Show More
@@ -0,0 +1,271 b''
1 """Kernel frontend classes.
2
3 To do:
4
5 1. Create custom channel subclasses for Qt.
6 2. Create logger to handle debugging and console messages.
7
8 """
9
10 from Queue import Queue, Empty
11 from threading import Thread
12 import traceback
13
14 import zmq
15 from zmq import POLLIN, POLLOUT, POLLERR
16 from zmq.eventloop import ioloop
17 from session import Session
18
19
20 class MissingHandlerError(Exception):
21 pass
22
23
24 class KernelManager(object):
25
26 def __init__(self, xreq_addr, sub_addr, rep_addr,
27 context=None, session=None):
28 self.context = zmq.Context() if context is None else context
29 self.session = Session() if session is None else session
30 self.xreq_addr = xreq_addr
31 self.sub_addr = sub_addr
32 self.rep_addr = rep_addr
33
34 def start_kernel(self):
35 """Start a localhost kernel on ip and port.
36
37 The SUB channel is for the frontend to receive messages published by
38 the kernel.
39
40 The REQ channel is for the frontend to make requests of the kernel.
41
42 The REP channel is for the kernel to request stdin (raw_input) from
43 the frontend.
44 """
45
46 def kill_kernel(self):
47 """Kill the running kernel"""
48
49 def is_alive(self):
50 """Is the kernel alive?"""
51 return True
52
53 def signal_kernel(self, signum):
54 """Send signum to the kernel."""
55
56 def get_sub_channel(self):
57 """Get the SUB socket channel object."""
58 return SubSocketChannel(self.context, self.session, self.sub_addr)
59
60 def get_xreq_channel(self):
61 """Get the REQ socket channel object to make requests of the kernel."""
62 return XReqSocketChannel(self.context, self.session, self.xreq_addr)
63
64 def get_rep_channel(self):
65 """Get the REP socket channel object to handle stdin (raw_input)."""
66 return RepSocketChannel(self.context, self.session, self.rep_addr)
67
68
69 class ZmqSocketChannel(Thread):
70
71 socket = None
72
73 def __init__(self, context, session, addr):
74 self.context = context
75 self.session = session
76 self.addr = addr
77 super(ZmqSocketChannel, self).__init__()
78 self.daemon = True
79
80
81 class SubSocketChannel(ZmqSocketChannel):
82
83 handlers = None
84 _overriden_call_handler = None
85
86 def __init__(self, context, session, addr):
87 self.handlers = {}
88 super(SubSocketChannel, self).__init__(context, session, addr)
89
90 def run(self):
91 self.socket = self.context.socket(zmq.SUB)
92 self.socket.setsockopt(zmq.SUBSCRIBE,'')
93 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
94 self.socket.connect('tcp://%s:%i' % self.addr)
95 self.ioloop = ioloop.IOLoop()
96 self.ioloop.add_handler(self.socket, self._handle_events,
97 POLLIN|POLLERR)
98 self.ioloop.start()
99
100 def _handle_events(self, socket, events):
101 # Turn on and off POLLOUT depending on if we have made a request
102 if events & POLLERR:
103 self._handle_err()
104 if events & POLLIN:
105 self._handle_recv()
106
107 def _handle_err(self):
108 raise zmq.ZmqError()
109
110 def _handle_recv(self):
111 msg = self.socket.recv_json()
112 self.call_handlers(msg)
113
114 def override_call_handler(self, func):
115 """Permanently override the call_handler.
116
117 The function func will be called as::
118
119 func(handler, msg)
120
121 And must call::
122
123 handler(msg)
124
125 in the main thread.
126 """
127 assert callable(func), "not a callable: %r" % func
128 self._overriden_call_handler = func
129
130 def call_handlers(self, msg):
131 handler = self.handlers.get(msg['msg_type'], None)
132 if handler is not None:
133 try:
134 self.call_handler(handler, msg)
135 except:
136 # XXX: This should be logged at least
137 traceback.print_last()
138
139 def call_handler(self, handler, msg):
140 if self._overriden_call_handler is not None:
141 self._overriden_call_handler(handler, msg)
142 elif hasattr(self, '_call_handler'):
143 call_handler = getattr(self, '_call_handler')
144 call_handler(handler, msg)
145 else:
146 raise RuntimeError('no handler!')
147
148 def add_handler(self, callback, msg_type):
149 """Register a callback for msg type."""
150 self.handlers[msg_type] = callback
151
152 def remove_handler(self, msg_type):
153 self.handlers.pop(msg_type, None)
154
155
156 class XReqSocketChannel(ZmqSocketChannel):
157
158 handler_queue = None
159 command_queue = None
160 handlers = None
161 _overriden_call_handler = None
162
163 def __init__(self, context, session, addr):
164 self.handlers = {}
165 self.handler_queue = Queue()
166 self.command_queue = Queue()
167 super(XReqSocketChannel, self).__init__(context, session, addr)
168
169 def run(self):
170 self.socket = self.context.socket(zmq.XREQ)
171 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
172 self.socket.connect('tcp://%s:%i' % self.addr)
173 self.ioloop = ioloop.IOLoop()
174 self.ioloop.add_handler(self.socket, self._handle_events,
175 POLLIN|POLLOUT|POLLERR)
176 self.ioloop.start()
177
178 def _handle_events(self, socket, events):
179 # Turn on and off POLLOUT depending on if we have made a request
180 if events & POLLERR:
181 self._handle_err()
182 if events & POLLOUT:
183 self._handle_send()
184 if events & POLLIN:
185 self._handle_recv()
186
187 def _handle_recv(self):
188 msg = self.socket.recv_json()
189 print "Got reply:", msg
190 try:
191 handler = self.handler_queue.get(False)
192 except Empty:
193 print "Message received with no handler!!!"
194 print msg
195 else:
196 self.call_handler(handler, msg)
197
198 def _handle_send(self):
199 try:
200 msg = self.command_queue.get(False)
201 except Empty:
202 pass
203 else:
204 self.socket.send_json(msg)
205
206 def _handle_err(self):
207 raise zmq.ZmqError()
208
209 def _queue_request(self, msg, callback):
210 handler = self._find_handler(msg['msg_type'], callback)
211 self.handler_queue.put(handler)
212 self.command_queue.put(msg)
213
214 def execute(self, code, callback=None):
215 # Create class for content/msg creation. Related to, but possibly
216 # not in Session.
217 content = dict(code=code)
218 msg = self.session.msg('execute_request', content)
219 self._queue_request(msg, callback)
220 return msg['header']['msg_id']
221
222 def complete(self, text, line, block=None, callback=None):
223 content = dict(text=text, line=line)
224 msg = self.session.msg('complete_request', content)
225 return self._queue_request(msg, callback)
226 return msg['header']['msg_id']
227
228 def object_info(self, oname, callback=None):
229 content = dict(oname=oname)
230 msg = self.session.msg('object_info_request', content)
231 return self._queue_request(msg, callback)
232 return msg['header']['msg_id']
233
234 def _find_handler(self, name, callback):
235 if callback is not None:
236 return callback
237 handler = self.handlers.get(name)
238 if handler is None:
239 raise MissingHandlerError('No handler defined for method: %s' % name)
240 return handler
241
242 def override_call_handler(self, func):
243 """Permanently override the call_handler.
244
245 The function func will be called as::
246
247 func(handler, msg)
248
249 And must call::
250
251 handler(msg)
252
253 in the main thread.
254 """
255 assert callable(func), "not a callable: %r" % func
256 self._overriden_call_handler = func
257
258 def call_handler(self, handler, msg):
259 if self._overriden_call_handler is not None:
260 self._overriden_call_handler(handler, msg)
261 elif hasattr(self, '_call_handler'):
262 call_handler = getattr(self, '_call_handler')
263 call_handler(handler, msg)
264 else:
265 raise RuntimeError('no handler!')
266
267
268 class RepSocketChannel(ZmqSocketChannel):
269
270 def on_raw_input():
271 pass
@@ -0,0 +1,52 b''
1 from Queue import Queue, Empty
2 import time
3
4 from kernelmanager import KernelManager
5
6 xreq_addr = ('127.0.0.1',5575)
7 sub_addr = ('127.0.0.1', 5576)
8 rep_addr = ('127.0.0.1', 5577)
9
10
11 km = KernelManager(xreq_addr, sub_addr, rep_addr)
12 # xreq_channel = km.get_xreq_channel()
13 sub_channel = km.get_sub_channel()
14
15 # xreq_channel.start()
16 sub_channel.start()
17
18 print "Channels are started"
19
20 def printer(msg):
21 print
22 print msg
23
24 class CallHandler(object):
25
26 def __init__(self):
27 self.queue = Queue()
28
29 def __call__(self, handler, msg):
30 self.queue.put((handler, msg))
31
32 def handle(self):
33 try:
34 handler, msg = self.queue.get(block=False)
35 except Empty:
36 pass
37 else:
38 handler(msg)
39
40 call_handler = CallHandler()
41 sub_channel.override_call_handler(call_handler)
42 sub_channel.add_handler(printer, 'pyin')
43 sub_channel.add_handler(printer, 'pyout')
44 sub_channel.add_handler(printer, 'stdout')
45 sub_channel.add_handler(printer, 'stderr')
46
47 for i in range(100):
48 call_handler.handle()
49 time.sleep(1)
50
51 # xreq_channel.join()
52 sub_channel.join() No newline at end of file
@@ -219,6 +219,7 b' class Kernel(object):'
219 219 assert self.reply_socket.rcvmore(), "Unexpected missing message part."
220 220 msg = self.reply_socket.recv_json()
221 221 omsg = Message(msg)
222 print>>sys.__stdout__
222 223 print>>sys.__stdout__, omsg
223 224 handler = self.handlers.get(omsg.msg_type, None)
224 225 if handler is None:
@@ -237,7 +238,8 b' def main():'
237 238 pub_conn = connection % (port_base+1)
238 239
239 240 print >>sys.__stdout__, "Starting the kernel..."
240 print >>sys.__stdout__, "On:",rep_conn, pub_conn
241 print >>sys.__stdout__, "XREP Channel:", rep_conn
242 print >>sys.__stdout__, "PUB Channel:", pub_conn
241 243
242 244 session = Session(username=u'kernel')
243 245
@@ -64,9 +64,12 b' def extract_header(msg_or_header):'
64 64
65 65 class Session(object):
66 66
67 def __init__(self, username=os.environ.get('USER','username')):
67 def __init__(self, username=os.environ.get('USER','username'), session=None):
68 68 self.username = username
69 if session is None:
69 70 self.session = str(uuid.uuid4())
71 else:
72 self.session = session
70 73 self.msg_id = 0
71 74
72 75 def msg_header(self):
General Comments 0
You need to be logged in to leave comments. Login now