##// END OF EJS Templates
* Added 'stop' methods to the ZmqSocketChannels...
epatters -
Show More
@@ -1,359 +1,403 b''
1 """Kernel frontend classes.
1 """Kernel frontend classes.
2
2
3 TODO: Create logger to handle debugging and console messages.
3 TODO: Create logger to handle debugging and console messages.
4
4
5 """
5 """
6
6
7 # Standard library imports.
7 # Standard library imports.
8 from Queue import Queue, Empty
8 from Queue import Queue, Empty
9 from threading import Thread
9 from threading import Thread
10 import time
10 import time
11 import traceback
11 import traceback
12
12
13 # System library imports.
13 # System library imports.
14 import zmq
14 import zmq
15 from zmq import POLLIN, POLLOUT, POLLERR
15 from zmq import POLLIN, POLLOUT, POLLERR
16 from zmq.eventloop import ioloop
16 from zmq.eventloop import ioloop
17
17
18 # Local imports.
18 # Local imports.
19 from IPython.utils.traitlets import HasTraits, Any, Int, Instance, Str, Type
19 from IPython.utils.traitlets import HasTraits, Any, Int, Instance, Str, Type
20 from session import Session
20 from session import Session
21
21
22
22
23 class MissingHandlerError(Exception):
23 class MissingHandlerError(Exception):
24 pass
24 pass
25
25
26
26
27 class ZmqSocketChannel(Thread):
27 class ZmqSocketChannel(Thread):
28 """ The base class for the channels that use ZMQ sockets.
28 """ The base class for the channels that use ZMQ sockets.
29 """
29 """
30
30
31 def __init__(self, context, session, addr=None):
31 def __init__(self, context, session, address=None):
32 super(ZmqSocketChannel, self).__init__()
33 self.daemon = True
34
32 self.context = context
35 self.context = context
33 self.session = session
36 self.session = session
34 self.addr = addr
37 self.address = address
35 self.socket = None
38 self.socket = None
36
39
37 super(ZmqSocketChannel, self).__init__()
40 def stop(self):
38 self.daemon = True
41 """ Stop the thread's activity. Returns when the thread terminates.
42 """
43 raise NotImplementedError
44
45 def get_address(self):
46 """ Get the channel's address.
47 """
48 return self._address
49
50 def set_adresss(self, address):
51 """ Set the channel's address. Should be a tuple of form:
52 (ip address [str], port [int])
53 or 'None' to indicate that no address has been specified.
54 """
55 # FIXME: Validate address.
56 if self.is_alive():
57 raise RuntimeError("Cannot set address on a running channel!")
58 else:
59 self._address = address
60
61 address = property(get_address, set_adresss)
39
62
40
63
41 class SubSocketChannel(ZmqSocketChannel):
64 class SubSocketChannel(ZmqSocketChannel):
42
65
43 handlers = None
66 handlers = None
44 _overriden_call_handler = None
67 _overriden_call_handler = None
45
68
46 def __init__(self, context, session, addr=None):
69 def __init__(self, context, session, address=None):
47 self.handlers = {}
70 self.handlers = {}
48 super(SubSocketChannel, self).__init__(context, session, addr)
71 super(SubSocketChannel, self).__init__(context, session, address)
49
72
50 def run(self):
73 def run(self):
51 self.socket = self.context.socket(zmq.SUB)
74 self.socket = self.context.socket(zmq.SUB)
52 self.socket.setsockopt(zmq.SUBSCRIBE,'')
75 self.socket.setsockopt(zmq.SUBSCRIBE,'')
53 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
76 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
54 self.socket.connect('tcp://%s:%i' % self.addr)
77 self.socket.connect('tcp://%s:%i' % self.address)
55 self.ioloop = ioloop.IOLoop()
78 self.ioloop = ioloop.IOLoop()
56 self.ioloop.add_handler(self.socket, self._handle_events,
79 self.ioloop.add_handler(self.socket, self._handle_events,
57 POLLIN|POLLERR)
80 POLLIN|POLLERR)
58 self.ioloop.start()
81 self.ioloop.start()
59
82
83 def stop(self):
84 self.ioloop.stop()
85 self.join()
86
60 def _handle_events(self, socket, events):
87 def _handle_events(self, socket, events):
61 # Turn on and off POLLOUT depending on if we have made a request
88 # Turn on and off POLLOUT depending on if we have made a request
62 if events & POLLERR:
89 if events & POLLERR:
63 self._handle_err()
90 self._handle_err()
64 if events & POLLIN:
91 if events & POLLIN:
65 self._handle_recv()
92 self._handle_recv()
66
93
67 def _handle_err(self):
94 def _handle_err(self):
68 raise zmq.ZmqError()
95 raise zmq.ZmqError()
69
96
70 def _handle_recv(self):
97 def _handle_recv(self):
71 msg = self.socket.recv_json()
98 msg = self.socket.recv_json()
72 self.call_handlers(msg)
99 self.call_handlers(msg)
73
100
74 def override_call_handler(self, func):
101 def override_call_handler(self, func):
75 """Permanently override the call_handler.
102 """Permanently override the call_handler.
76
103
77 The function func will be called as::
104 The function func will be called as::
78
105
79 func(handler, msg)
106 func(handler, msg)
80
107
81 And must call::
108 And must call::
82
109
83 handler(msg)
110 handler(msg)
84
111
85 in the main thread.
112 in the main thread.
86 """
113 """
87 assert callable(func), "not a callable: %r" % func
114 assert callable(func), "not a callable: %r" % func
88 self._overriden_call_handler = func
115 self._overriden_call_handler = func
89
116
90 def call_handlers(self, msg):
117 def call_handlers(self, msg):
91 handler = self.handlers.get(msg['msg_type'], None)
118 handler = self.handlers.get(msg['msg_type'], None)
92 if handler is not None:
119 if handler is not None:
93 try:
120 try:
94 self.call_handler(handler, msg)
121 self.call_handler(handler, msg)
95 except:
122 except:
96 # XXX: This should be logged at least
123 # XXX: This should be logged at least
97 traceback.print_last()
124 traceback.print_last()
98
125
99 def call_handler(self, handler, msg):
126 def call_handler(self, handler, msg):
100 if self._overriden_call_handler is not None:
127 if self._overriden_call_handler is not None:
101 self._overriden_call_handler(handler, msg)
128 self._overriden_call_handler(handler, msg)
102 elif hasattr(self, '_call_handler'):
129 elif hasattr(self, '_call_handler'):
103 call_handler = getattr(self, '_call_handler')
130 call_handler = getattr(self, '_call_handler')
104 call_handler(handler, msg)
131 call_handler(handler, msg)
105 else:
132 else:
106 raise RuntimeError('no handler!')
133 raise RuntimeError('no handler!')
107
134
108 def add_handler(self, callback, msg_type):
135 def add_handler(self, callback, msg_type):
109 """Register a callback for msg type."""
136 """Register a callback for msg type."""
110 self.handlers[msg_type] = callback
137 self.handlers[msg_type] = callback
111
138
112 def remove_handler(self, msg_type):
139 def remove_handler(self, msg_type):
113 """Remove the callback for msg type."""
140 """Remove the callback for msg type."""
114 self.handlers.pop(msg_type, None)
141 self.handlers.pop(msg_type, None)
115
142
116 def flush(self):
143 def flush(self):
117 """Immediately processes all pending messages on the SUB channel. This
144 """Immediately processes all pending messages on the SUB channel. This
118 method is thread safe.
145 method is thread safe.
119 """
146 """
120 self._flushed = False
147 self._flushed = False
121 self.ioloop.add_callback(self._flush)
148 self.ioloop.add_callback(self._flush)
122 while not self._flushed:
149 while not self._flushed:
123 time.sleep(0.01)
150 time.sleep(0.01)
124
151
125 def _flush(self):
152 def _flush(self):
126 """Called in this thread by the IOLoop to indicate that all events have
153 """Called in this thread by the IOLoop to indicate that all events have
127 been processed.
154 been processed.
128 """
155 """
129 self._flushed = True
156 self._flushed = True
130
157
131
158
132 class XReqSocketChannel(ZmqSocketChannel):
159 class XReqSocketChannel(ZmqSocketChannel):
133
160
134 handler_queue = None
161 handler_queue = None
135 command_queue = None
162 command_queue = None
136 handlers = None
163 handlers = None
137 _overriden_call_handler = None
164 _overriden_call_handler = None
138
165
139 def __init__(self, context, session, addr=None):
166 def __init__(self, context, session, address=None):
140 self.handlers = {}
167 self.handlers = {}
141 self.handler_queue = Queue()
168 self.handler_queue = Queue()
142 self.command_queue = Queue()
169 self.command_queue = Queue()
143 super(XReqSocketChannel, self).__init__(context, session, addr)
170 super(XReqSocketChannel, self).__init__(context, session, address)
144
171
145 def run(self):
172 def run(self):
146 self.socket = self.context.socket(zmq.XREQ)
173 self.socket = self.context.socket(zmq.XREQ)
147 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
174 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
148 self.socket.connect('tcp://%s:%i' % self.addr)
175 self.socket.connect('tcp://%s:%i' % self.address)
149 self.ioloop = ioloop.IOLoop()
176 self.ioloop = ioloop.IOLoop()
150 self.ioloop.add_handler(self.socket, self._handle_events,
177 self.ioloop.add_handler(self.socket, self._handle_events,
151 POLLIN|POLLOUT|POLLERR)
178 POLLIN|POLLOUT|POLLERR)
152 self.ioloop.start()
179 self.ioloop.start()
153
180
181 def stop(self):
182 self.ioloop.stop()
183 self.join()
184
154 def _handle_events(self, socket, events):
185 def _handle_events(self, socket, events):
155 # Turn on and off POLLOUT depending on if we have made a request
186 # Turn on and off POLLOUT depending on if we have made a request
156 if events & POLLERR:
187 if events & POLLERR:
157 self._handle_err()
188 self._handle_err()
158 if events & POLLOUT:
189 if events & POLLOUT:
159 self._handle_send()
190 self._handle_send()
160 if events & POLLIN:
191 if events & POLLIN:
161 self._handle_recv()
192 self._handle_recv()
162
193
163 def _handle_recv(self):
194 def _handle_recv(self):
164 msg = self.socket.recv_json()
195 msg = self.socket.recv_json()
165 self.call_handlers(msg)
196 self.call_handlers(msg)
166
197
167 def _handle_send(self):
198 def _handle_send(self):
168 try:
199 try:
169 msg = self.command_queue.get(False)
200 msg = self.command_queue.get(False)
170 except Empty:
201 except Empty:
171 pass
202 pass
172 else:
203 else:
173 self.socket.send_json(msg)
204 self.socket.send_json(msg)
174
205
175 def _handle_err(self):
206 def _handle_err(self):
176 raise zmq.ZmqError()
207 raise zmq.ZmqError()
177
208
178 def _queue_request(self, msg, callback):
209 def _queue_request(self, msg, callback):
179 handler = self._find_handler(msg['msg_type'], callback)
210 handler = self._find_handler(msg['msg_type'], callback)
180 self.handler_queue.put(handler)
211 self.handler_queue.put(handler)
181 self.command_queue.put(msg)
212 self.command_queue.put(msg)
182
213
183 def execute(self, code, callback=None):
214 def execute(self, code, callback=None):
184 # Create class for content/msg creation. Related to, but possibly
215 # Create class for content/msg creation. Related to, but possibly
185 # not in Session.
216 # not in Session.
186 content = dict(code=code)
217 content = dict(code=code)
187 msg = self.session.msg('execute_request', content)
218 msg = self.session.msg('execute_request', content)
188 self._queue_request(msg, callback)
219 self._queue_request(msg, callback)
189 return msg['header']['msg_id']
220 return msg['header']['msg_id']
190
221
191 def complete(self, text, line, block=None, callback=None):
222 def complete(self, text, line, block=None, callback=None):
192 content = dict(text=text, line=line)
223 content = dict(text=text, line=line)
193 msg = self.session.msg('complete_request', content)
224 msg = self.session.msg('complete_request', content)
194 self._queue_request(msg, callback)
225 self._queue_request(msg, callback)
195 return msg['header']['msg_id']
226 return msg['header']['msg_id']
196
227
197 def object_info(self, oname, callback=None):
228 def object_info(self, oname, callback=None):
198 content = dict(oname=oname)
229 content = dict(oname=oname)
199 msg = self.session.msg('object_info_request', content)
230 msg = self.session.msg('object_info_request', content)
200 self._queue_request(msg, callback)
231 self._queue_request(msg, callback)
201 return msg['header']['msg_id']
232 return msg['header']['msg_id']
202
233
203 def _find_handler(self, name, callback):
234 def _find_handler(self, name, callback):
204 if callback is not None:
235 if callback is not None:
205 return callback
236 return callback
206 handler = self.handlers.get(name)
237 handler = self.handlers.get(name)
207 if handler is None:
238 if handler is None:
208 raise MissingHandlerError(
239 raise MissingHandlerError(
209 'No handler defined for method: %s' % name)
240 'No handler defined for method: %s' % name)
210 return handler
241 return handler
211
242
212 def override_call_handler(self, func):
243 def override_call_handler(self, func):
213 """Permanently override the call_handler.
244 """Permanently override the call_handler.
214
245
215 The function func will be called as::
246 The function func will be called as::
216
247
217 func(handler, msg)
248 func(handler, msg)
218
249
219 And must call::
250 And must call::
220
251
221 handler(msg)
252 handler(msg)
222
253
223 in the main thread.
254 in the main thread.
224 """
255 """
225 assert callable(func), "not a callable: %r" % func
256 assert callable(func), "not a callable: %r" % func
226 self._overriden_call_handler = func
257 self._overriden_call_handler = func
227
258
228 def call_handlers(self, msg):
259 def call_handlers(self, msg):
229 try:
260 try:
230 handler = self.handler_queue.get(False)
261 handler = self.handler_queue.get(False)
231 except Empty:
262 except Empty:
232 print "Message received with no handler!!!"
263 print "Message received with no handler!!!"
233 print msg
264 print msg
234 else:
265 else:
235 self.call_handler(handler, msg)
266 self.call_handler(handler, msg)
236
267
237 def call_handler(self, handler, msg):
268 def call_handler(self, handler, msg):
238 if self._overriden_call_handler is not None:
269 if self._overriden_call_handler is not None:
239 self._overriden_call_handler(handler, msg)
270 self._overriden_call_handler(handler, msg)
240 elif hasattr(self, '_call_handler'):
271 elif hasattr(self, '_call_handler'):
241 call_handler = getattr(self, '_call_handler')
272 call_handler = getattr(self, '_call_handler')
242 call_handler(handler, msg)
273 call_handler(handler, msg)
243 else:
274 else:
244 raise RuntimeError('no handler!')
275 raise RuntimeError('no handler!')
245
276
246
277
247 class RepSocketChannel(ZmqSocketChannel):
278 class RepSocketChannel(ZmqSocketChannel):
248
279
249 def on_raw_input():
280 def stop(self):
281 pass
282
283 def on_raw_input(self):
250 pass
284 pass
251
285
252
286
253 class KernelManager(HasTraits):
287 class KernelManager(HasTraits):
254 """ Manages a kernel for a frontend.
288 """ Manages a kernel for a frontend.
255
289
256 The SUB channel is for the frontend to receive messages published by the
290 The SUB channel is for the frontend to receive messages published by the
257 kernel.
291 kernel.
258
292
259 The REQ channel is for the frontend to make requests of the kernel.
293 The REQ channel is for the frontend to make requests of the kernel.
260
294
261 The REP channel is for the kernel to request stdin (raw_input) from the
295 The REP channel is for the kernel to request stdin (raw_input) from the
262 frontend.
296 frontend.
263 """
297 """
264
298
265 # The PyZMQ Context to use for communication with the kernel.
299 # The PyZMQ Context to use for communication with the kernel.
266 context = Instance(zmq.Context, ())
300 context = Instance(zmq.Context, ())
267
301
268 # The Session to use for communication with the kernel.
302 # The Session to use for communication with the kernel.
269 session = Instance(Session, ())
303 session = Instance(Session, ())
270
304
271 # The channels objects used for communication with the kernel.
272 # FIXME: Add '_traitname_default' instantiation method to Traitlets.
273 #sub_channel = Instance(SubSocketChannel)
274 #xreq_channel = Instance(XReqSocketChannel)
275 #rep_channel = Instance(RepSocketChannel)
276
277 # The classes to use for the various channels.
305 # The classes to use for the various channels.
278 sub_channel_class = Type(SubSocketChannel)
306 sub_channel_class = Type(SubSocketChannel)
279 xreq_channel_class = Type(XReqSocketChannel)
307 xreq_channel_class = Type(XReqSocketChannel)
280 rep_channel_class = Type(RepSocketChannel)
308 rep_channel_class = Type(RepSocketChannel)
281
282 # The addresses to use for the various channels. Should be tuples of form
283 # (ip_address, port).
284 #sub_address = DelegatesTo('sub_channel')
285 #xreq_address = DelegatesTo('xreq_channel')
286 #rep_address = DelegatesTo('rep_channel')
287
309
288 # Protected traits.
310 # Protected traits.
289 _sub_channel = Any
311 _sub_channel = Any
290 _xreq_channel = Any
312 _xreq_channel = Any
291 _rep_channel = Any
313 _rep_channel = Any
292
314
293 def __init__(self, **traits):
315 def __init__(self, **traits):
294 super(KernelManager, self).__init__()
316 super(KernelManager, self).__init__()
295
317
296 # FIXME: This should be the business of HasTraits. The convention is:
318 # FIXME: This should be the business of HasTraits. The convention is:
297 # HasTraits.__init__(self, **traits_to_be_initialized.)
319 # HasTraits.__init__(self, **traits_to_be_initialized.)
298 for trait in traits:
320 for trait in traits:
299 setattr(self, trait, traits[trait])
321 setattr(self, trait, traits[trait])
300
322
301 def start_kernel(self):
323 def start_kernel(self):
302 """Start a localhost kernel. If ports have been specified, use
324 """Start a localhost kernel. If ports have been specified, use
303 them. Otherwise, choose an open port at random.
325 them. Otherwise, choose an open port at random.
304 """
326 """
327 self.sub_channel.start()
328 self.xreq_channel.start()
329 self.rep_channel.start()
305
330
306 def kill_kernel(self):
331 def kill_kernel(self):
307 """Kill the running kernel"""
332 """Kill the running kernel.
333 """
334 self.sub_channel.stop()
335 self.xreq_channel.stop()
336 self.rep_channel.stop()
308
337
309 def is_alive(self):
338 def is_alive(self):
310 """Is the kernel alive?"""
339 """Is the kernel alive?"""
311 return True
340 return True
312
341
313 def signal_kernel(self, signum):
342 def signal_kernel(self, signum):
314 """Send signum to the kernel."""
343 """Send signum to the kernel."""
315
344
345 #--------------------------------------------------------------------------
346 # Channels used for communication with the kernel:
347 #--------------------------------------------------------------------------
348
316 @property
349 @property
317 def sub_channel(self):
350 def sub_channel(self):
318 """Get the SUB socket channel object."""
351 """Get the SUB socket channel object."""
319 if self._sub_channel is None:
352 if self._sub_channel is None:
320 self._sub_channel = self.sub_channel_class(self.context,
353 self._sub_channel = self.sub_channel_class(self.context,
321 self.session)
354 self.session)
322 return self._sub_channel
355 return self._sub_channel
323
356
324 @property
357 @property
325 def xreq_channel(self):
358 def xreq_channel(self):
326 """Get the REQ socket channel object to make requests of the kernel."""
359 """Get the REQ socket channel object to make requests of the kernel."""
327 if self._xreq_channel is None:
360 if self._xreq_channel is None:
328 self._xreq_channel = self.xreq_channel_class(self.context,
361 self._xreq_channel = self.xreq_channel_class(self.context,
329 self.session)
362 self.session)
330 return self._xreq_channel
363 return self._xreq_channel
331
364
332 @property
365 @property
333 def rep_channel(self):
366 def rep_channel(self):
334 """Get the REP socket channel object to handle stdin (raw_input)."""
367 """Get the REP socket channel object to handle stdin (raw_input)."""
335 if self._rep_channel is None:
368 if self._rep_channel is None:
336 self._rep_channel = self.rep_channel_class(self.context,
369 self._rep_channel = self.rep_channel_class(self.context,
337 self.session)
370 self.session)
338 return self._rep_channel
371 return self._rep_channel
339
372
373 #--------------------------------------------------------------------------
374 # Channel addresses:
375 #--------------------------------------------------------------------------
376
340 def get_sub_address(self):
377 def get_sub_address(self):
341 return self.sub_channel.addr
378 return self.sub_channel.address
342 def set_sub_address(self, addr):
379
343 self.sub_channel.addr = addr
380 def set_sub_address(self, address):
381 self.sub_channel.address = address
382
344 sub_address = property(get_sub_address, set_sub_address,
383 sub_address = property(get_sub_address, set_sub_address,
345 doc="The address used by SUB socket channel.")
384 doc="The address used by SUB socket channel.")
346
385
347 def get_xreq_address(self):
386 def get_xreq_address(self):
348 return self.xreq_channel.addr
387 return self.xreq_channel.address
349 def set_xreq_address(self, addr):
388
350 self.xreq_channel.addr = addr
389 def set_xreq_address(self, address):
390 self.xreq_channel.address = address
391
351 xreq_address = property(get_xreq_address, set_xreq_address,
392 xreq_address = property(get_xreq_address, set_xreq_address,
352 doc="The address used by XREQ socket channel.")
393 doc="The address used by XREQ socket channel.")
353
394
354 def get_rep_address(self):
395 def get_rep_address(self):
355 return self.rep_channel.addr
396 return self.rep_channel.address
356 def set_rep_address(self, addr):
397
357 self.rep_channel.addr = addr
398 def set_rep_address(self, address):
399 self.rep_channel.address = address
400
358 rep_address = property(get_rep_address, set_rep_address,
401 rep_address = property(get_rep_address, set_rep_address,
359 doc="The address used by REP socket channel.")
402 doc="The address used by REP socket channel.")
403
General Comments 0
You need to be logged in to leave comments. Login now