##// END OF EJS Templates
Channels can no longer be restarted.
Brian Granger -
Show More
@@ -1,493 +1,492 b''
1 """Kernel frontend classes.
1 """Kernel frontend classes.
2
2
3 TODO: Create logger to handle debugging and console messages.
3 TODO: Create logger to handle debugging and console messages.
4
4
5 """
5 """
6
6
7 # Standard library imports.
7 # Standard library imports.
8 from Queue import Queue, Empty
8 from Queue import Queue, Empty
9 from subprocess import Popen
9 from subprocess import Popen
10 from threading import Thread
10 from threading import Thread
11 import time
11 import time
12 import traceback
12 import traceback
13
13
14 # System library imports.
14 # System library imports.
15 import zmq
15 import zmq
16 from zmq import POLLIN, POLLOUT, POLLERR
16 from zmq import POLLIN, POLLOUT, POLLERR
17 from zmq.eventloop import ioloop
17 from zmq.eventloop import ioloop
18
18
19 # Local imports.
19 # Local imports.
20 from IPython.utils.traitlets import HasTraits, Any, Bool, Int, Instance, Str, \
20 from IPython.utils.traitlets import HasTraits, Any, Bool, Int, Instance, Str, \
21 Type
21 Type
22 from kernel import launch_kernel
22 from kernel import launch_kernel
23 from session import Session
23 from session import Session
24
24
25 # Constants.
25 # Constants.
26 LOCALHOST = '127.0.0.1'
26 LOCALHOST = '127.0.0.1'
27
27
28
28
29 class MissingHandlerError(Exception):
29 class MissingHandlerError(Exception):
30 pass
30 pass
31
31
32
32
33 class ZmqSocketChannel(Thread):
33 class ZmqSocketChannel(Thread):
34 """ The base class for the channels that use ZMQ sockets.
34 """ The base class for the channels that use ZMQ sockets.
35 """
35 """
36
36
37 def __init__(self, context, session, address=None):
37 def __init__(self, context, session, address=None):
38 super(ZmqSocketChannel, self).__init__()
38 super(ZmqSocketChannel, self).__init__()
39 self.daemon = True
39 self.daemon = True
40
40
41 self.context = context
41 self.context = context
42 self.session = session
42 self.session = session
43 self.address = address
43 self.address = address
44 self.socket = None
44 self.socket = None
45
45
46 def stop(self):
46 def stop(self):
47 """ Stop the thread's activity. Returns when the thread terminates.
47 """Stop the thread's activity. Returns when the thread terminates.
48
49 The thread will raise :class:`RuntimeError` if :method:`self.start`
50 is called again.
48 """
51 """
49 self.join()
52 self.join()
50
53
51 # Allow the thread to be started again.
52 # FIXME: Although this works (and there's no reason why it shouldn't),
53 # it feels wrong. Is there a cleaner way to achieve this?
54 Thread.__init__(self)
55
54
56 def get_address(self):
55 def get_address(self):
57 """ Get the channel's address. By the default, a channel is on
56 """ Get the channel's address. By the default, a channel is on
58 localhost with no port specified (a negative port number).
57 localhost with no port specified (a negative port number).
59 """
58 """
60 return self._address
59 return self._address
61
60
62 def set_adresss(self, address):
61 def set_adresss(self, address):
63 """ Set the channel's address. Should be a tuple of form:
62 """ Set the channel's address. Should be a tuple of form:
64 (ip address [str], port [int]).
63 (ip address [str], port [int]).
65 or None, in which case the address is reset to its default value.
64 or None, in which case the address is reset to its default value.
66 """
65 """
67 # FIXME: Validate address.
66 # FIXME: Validate address.
68 if self.is_alive():
67 if self.is_alive(): # This is Thread.is_alive
69 raise RuntimeError("Cannot set address on a running channel!")
68 raise RuntimeError("Cannot set address on a running channel!")
70 else:
69 else:
71 if address is None:
70 if address is None:
72 address = (LOCALHOST, 0)
71 address = (LOCALHOST, 0)
73 self._address = address
72 self._address = address
74
73
75 address = property(get_address, set_adresss)
74 address = property(get_address, set_adresss)
76
75
77
76
78 class SubSocketChannel(ZmqSocketChannel):
77 class SubSocketChannel(ZmqSocketChannel):
79
78
80 handlers = None
79 handlers = None
81 _overriden_call_handler = None
80 _overriden_call_handler = None
82
81
83 def __init__(self, context, session, address=None):
82 def __init__(self, context, session, address=None):
84 self.handlers = {}
83 self.handlers = {}
85 super(SubSocketChannel, self).__init__(context, session, address)
84 super(SubSocketChannel, self).__init__(context, session, address)
86
85
87 def run(self):
86 def run(self):
88 self.socket = self.context.socket(zmq.SUB)
87 self.socket = self.context.socket(zmq.SUB)
89 self.socket.setsockopt(zmq.SUBSCRIBE,'')
88 self.socket.setsockopt(zmq.SUBSCRIBE,'')
90 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
89 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
91 self.socket.connect('tcp://%s:%i' % self.address)
90 self.socket.connect('tcp://%s:%i' % self.address)
92 self.ioloop = ioloop.IOLoop()
91 self.ioloop = ioloop.IOLoop()
93 self.ioloop.add_handler(self.socket, self._handle_events,
92 self.ioloop.add_handler(self.socket, self._handle_events,
94 POLLIN|POLLERR)
93 POLLIN|POLLERR)
95 self.ioloop.start()
94 self.ioloop.start()
96
95
97 def stop(self):
96 def stop(self):
98 self.ioloop.stop()
97 self.ioloop.stop()
99 super(SubSocketChannel, self).stop()
98 super(SubSocketChannel, self).stop()
100
99
101 def _handle_events(self, socket, events):
100 def _handle_events(self, socket, events):
102 # Turn on and off POLLOUT depending on if we have made a request
101 # Turn on and off POLLOUT depending on if we have made a request
103 if events & POLLERR:
102 if events & POLLERR:
104 self._handle_err()
103 self._handle_err()
105 if events & POLLIN:
104 if events & POLLIN:
106 self._handle_recv()
105 self._handle_recv()
107
106
108 def _handle_err(self):
107 def _handle_err(self):
109 raise zmq.ZmqError()
108 raise zmq.ZmqError()
110
109
111 def _handle_recv(self):
110 def _handle_recv(self):
112 msg = self.socket.recv_json()
111 msg = self.socket.recv_json()
113 self.call_handlers(msg)
112 self.call_handlers(msg)
114
113
115 def override_call_handler(self, func):
114 def override_call_handler(self, func):
116 """Permanently override the call_handler.
115 """Permanently override the call_handler.
117
116
118 The function func will be called as::
117 The function func will be called as::
119
118
120 func(handler, msg)
119 func(handler, msg)
121
120
122 And must call::
121 And must call::
123
122
124 handler(msg)
123 handler(msg)
125
124
126 in the main thread.
125 in the main thread.
127 """
126 """
128 assert callable(func), "not a callable: %r" % func
127 assert callable(func), "not a callable: %r" % func
129 self._overriden_call_handler = func
128 self._overriden_call_handler = func
130
129
131 def call_handlers(self, msg):
130 def call_handlers(self, msg):
132 handler = self.handlers.get(msg['msg_type'], None)
131 handler = self.handlers.get(msg['msg_type'], None)
133 if handler is not None:
132 if handler is not None:
134 try:
133 try:
135 self.call_handler(handler, msg)
134 self.call_handler(handler, msg)
136 except:
135 except:
137 # XXX: This should be logged at least
136 # XXX: This should be logged at least
138 traceback.print_last()
137 traceback.print_last()
139
138
140 def call_handler(self, handler, msg):
139 def call_handler(self, handler, msg):
141 if self._overriden_call_handler is not None:
140 if self._overriden_call_handler is not None:
142 self._overriden_call_handler(handler, msg)
141 self._overriden_call_handler(handler, msg)
143 elif hasattr(self, '_call_handler'):
142 elif hasattr(self, '_call_handler'):
144 call_handler = getattr(self, '_call_handler')
143 call_handler = getattr(self, '_call_handler')
145 call_handler(handler, msg)
144 call_handler(handler, msg)
146 else:
145 else:
147 raise RuntimeError('no handler!')
146 raise RuntimeError('no handler!')
148
147
149 def add_handler(self, callback, msg_type):
148 def add_handler(self, callback, msg_type):
150 """Register a callback for msg type."""
149 """Register a callback for msg type."""
151 self.handlers[msg_type] = callback
150 self.handlers[msg_type] = callback
152
151
153 def remove_handler(self, msg_type):
152 def remove_handler(self, msg_type):
154 """Remove the callback for msg type."""
153 """Remove the callback for msg type."""
155 self.handlers.pop(msg_type, None)
154 self.handlers.pop(msg_type, None)
156
155
157 def flush(self, timeout=1.0):
156 def flush(self, timeout=1.0):
158 """Immediately processes all pending messages on the SUB channel.
157 """Immediately processes all pending messages on the SUB channel.
159
158
160 This method is thread safe.
159 This method is thread safe.
161
160
162 Parameters
161 Parameters
163 ----------
162 ----------
164 timeout : float, optional
163 timeout : float, optional
165 The maximum amount of time to spend flushing, in seconds. The
164 The maximum amount of time to spend flushing, in seconds. The
166 default is one second.
165 default is one second.
167 """
166 """
168 # We do the IOLoop callback process twice to ensure that the IOLoop
167 # We do the IOLoop callback process twice to ensure that the IOLoop
169 # gets to perform at least one full poll.
168 # gets to perform at least one full poll.
170 stop_time = time.time() + timeout
169 stop_time = time.time() + timeout
171 for i in xrange(2):
170 for i in xrange(2):
172 self._flushed = False
171 self._flushed = False
173 self.ioloop.add_callback(self._flush)
172 self.ioloop.add_callback(self._flush)
174 while not self._flushed and time.time() < stop_time:
173 while not self._flushed and time.time() < stop_time:
175 time.sleep(0.01)
174 time.sleep(0.01)
176
175
177 def _flush(self):
176 def _flush(self):
178 """Called in this thread by the IOLoop to indicate that all events have
177 """Called in this thread by the IOLoop to indicate that all events have
179 been processed.
178 been processed.
180 """
179 """
181 self._flushed = True
180 self._flushed = True
182
181
183
182
184 class XReqSocketChannel(ZmqSocketChannel):
183 class XReqSocketChannel(ZmqSocketChannel):
185
184
186 handler_queue = None
185 handler_queue = None
187 command_queue = None
186 command_queue = None
188 handlers = None
187 handlers = None
189 _overriden_call_handler = None
188 _overriden_call_handler = None
190
189
191 def __init__(self, context, session, address=None):
190 def __init__(self, context, session, address=None):
192 self.handlers = {}
191 self.handlers = {}
193 self.handler_queue = Queue()
192 self.handler_queue = Queue()
194 self.command_queue = Queue()
193 self.command_queue = Queue()
195 super(XReqSocketChannel, self).__init__(context, session, address)
194 super(XReqSocketChannel, self).__init__(context, session, address)
196
195
197 def run(self):
196 def run(self):
198 self.socket = self.context.socket(zmq.XREQ)
197 self.socket = self.context.socket(zmq.XREQ)
199 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
198 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
200 self.socket.connect('tcp://%s:%i' % self.address)
199 self.socket.connect('tcp://%s:%i' % self.address)
201 self.ioloop = ioloop.IOLoop()
200 self.ioloop = ioloop.IOLoop()
202 self.ioloop.add_handler(self.socket, self._handle_events,
201 self.ioloop.add_handler(self.socket, self._handle_events,
203 POLLIN|POLLOUT|POLLERR)
202 POLLIN|POLLOUT|POLLERR)
204 self.ioloop.start()
203 self.ioloop.start()
205
204
206 def stop(self):
205 def stop(self):
207 self.ioloop.stop()
206 self.ioloop.stop()
208 super(XReqSocketChannel, self).stop()
207 super(XReqSocketChannel, self).stop()
209
208
210 def _handle_events(self, socket, events):
209 def _handle_events(self, socket, events):
211 # Turn on and off POLLOUT depending on if we have made a request
210 # Turn on and off POLLOUT depending on if we have made a request
212 if events & POLLERR:
211 if events & POLLERR:
213 self._handle_err()
212 self._handle_err()
214 if events & POLLOUT:
213 if events & POLLOUT:
215 self._handle_send()
214 self._handle_send()
216 if events & POLLIN:
215 if events & POLLIN:
217 self._handle_recv()
216 self._handle_recv()
218
217
219 def _handle_recv(self):
218 def _handle_recv(self):
220 msg = self.socket.recv_json()
219 msg = self.socket.recv_json()
221 self.call_handlers(msg)
220 self.call_handlers(msg)
222
221
223 def _handle_send(self):
222 def _handle_send(self):
224 try:
223 try:
225 msg = self.command_queue.get(False)
224 msg = self.command_queue.get(False)
226 except Empty:
225 except Empty:
227 pass
226 pass
228 else:
227 else:
229 self.socket.send_json(msg)
228 self.socket.send_json(msg)
230
229
231 def _handle_err(self):
230 def _handle_err(self):
232 raise zmq.ZmqError()
231 raise zmq.ZmqError()
233
232
234 def _queue_request(self, msg, callback):
233 def _queue_request(self, msg, callback):
235 handler = self._find_handler(msg['msg_type'], callback)
234 handler = self._find_handler(msg['msg_type'], callback)
236 self.handler_queue.put(handler)
235 self.handler_queue.put(handler)
237 self.command_queue.put(msg)
236 self.command_queue.put(msg)
238
237
239 def execute(self, code, callback=None):
238 def execute(self, code, callback=None):
240 # Create class for content/msg creation. Related to, but possibly
239 # Create class for content/msg creation. Related to, but possibly
241 # not in Session.
240 # not in Session.
242 content = dict(code=code)
241 content = dict(code=code)
243 msg = self.session.msg('execute_request', content)
242 msg = self.session.msg('execute_request', content)
244 self._queue_request(msg, callback)
243 self._queue_request(msg, callback)
245 return msg['header']['msg_id']
244 return msg['header']['msg_id']
246
245
247 def complete(self, text, line, block=None, callback=None):
246 def complete(self, text, line, block=None, callback=None):
248 content = dict(text=text, line=line)
247 content = dict(text=text, line=line)
249 msg = self.session.msg('complete_request', content)
248 msg = self.session.msg('complete_request', content)
250 self._queue_request(msg, callback)
249 self._queue_request(msg, callback)
251 return msg['header']['msg_id']
250 return msg['header']['msg_id']
252
251
253 def object_info(self, oname, callback=None):
252 def object_info(self, oname, callback=None):
254 content = dict(oname=oname)
253 content = dict(oname=oname)
255 msg = self.session.msg('object_info_request', content)
254 msg = self.session.msg('object_info_request', content)
256 self._queue_request(msg, callback)
255 self._queue_request(msg, callback)
257 return msg['header']['msg_id']
256 return msg['header']['msg_id']
258
257
259 def _find_handler(self, name, callback):
258 def _find_handler(self, name, callback):
260 if callback is not None:
259 if callback is not None:
261 return callback
260 return callback
262 handler = self.handlers.get(name)
261 handler = self.handlers.get(name)
263 if handler is None:
262 if handler is None:
264 raise MissingHandlerError(
263 raise MissingHandlerError(
265 'No handler defined for method: %s' % name)
264 'No handler defined for method: %s' % name)
266 return handler
265 return handler
267
266
268 def override_call_handler(self, func):
267 def override_call_handler(self, func):
269 """Permanently override the call_handler.
268 """Permanently override the call_handler.
270
269
271 The function func will be called as::
270 The function func will be called as::
272
271
273 func(handler, msg)
272 func(handler, msg)
274
273
275 And must call::
274 And must call::
276
275
277 handler(msg)
276 handler(msg)
278
277
279 in the main thread.
278 in the main thread.
280 """
279 """
281 assert callable(func), "not a callable: %r" % func
280 assert callable(func), "not a callable: %r" % func
282 self._overriden_call_handler = func
281 self._overriden_call_handler = func
283
282
284 def call_handlers(self, msg):
283 def call_handlers(self, msg):
285 try:
284 try:
286 handler = self.handler_queue.get(False)
285 handler = self.handler_queue.get(False)
287 except Empty:
286 except Empty:
288 print "Message received with no handler!!!"
287 print "Message received with no handler!!!"
289 print msg
288 print msg
290 else:
289 else:
291 self.call_handler(handler, msg)
290 self.call_handler(handler, msg)
292
291
293 def call_handler(self, handler, msg):
292 def call_handler(self, handler, msg):
294 if self._overriden_call_handler is not None:
293 if self._overriden_call_handler is not None:
295 self._overriden_call_handler(handler, msg)
294 self._overriden_call_handler(handler, msg)
296 elif hasattr(self, '_call_handler'):
295 elif hasattr(self, '_call_handler'):
297 call_handler = getattr(self, '_call_handler')
296 call_handler = getattr(self, '_call_handler')
298 call_handler(handler, msg)
297 call_handler(handler, msg)
299 else:
298 else:
300 raise RuntimeError('no handler!')
299 raise RuntimeError('no handler!')
301
300
302
301
303 class RepSocketChannel(ZmqSocketChannel):
302 class RepSocketChannel(ZmqSocketChannel):
304
303
305 def on_raw_input(self):
304 def on_raw_input(self):
306 pass
305 pass
307
306
308
307
309 class KernelManager(HasTraits):
308 class KernelManager(HasTraits):
310 """ Manages a kernel for a frontend.
309 """ Manages a kernel for a frontend.
311
310
312 The SUB channel is for the frontend to receive messages published by the
311 The SUB channel is for the frontend to receive messages published by the
313 kernel.
312 kernel.
314
313
315 The REQ channel is for the frontend to make requests of the kernel.
314 The REQ channel is for the frontend to make requests of the kernel.
316
315
317 The REP channel is for the kernel to request stdin (raw_input) from the
316 The REP channel is for the kernel to request stdin (raw_input) from the
318 frontend.
317 frontend.
319 """
318 """
320
319
321 # Whether the kernel manager is currently listening on its channels.
320 # Whether the kernel manager is currently listening on its channels.
322 is_listening = Bool(False)
321 is_listening = Bool(False)
323
322
324 # The PyZMQ Context to use for communication with the kernel.
323 # The PyZMQ Context to use for communication with the kernel.
325 context = Instance(zmq.Context, ())
324 context = Instance(zmq.Context, ())
326
325
327 # The Session to use for communication with the kernel.
326 # The Session to use for communication with the kernel.
328 session = Instance(Session, ())
327 session = Instance(Session, ())
329
328
330 # The classes to use for the various channels.
329 # The classes to use for the various channels.
331 sub_channel_class = Type(SubSocketChannel)
330 sub_channel_class = Type(SubSocketChannel)
332 xreq_channel_class = Type(XReqSocketChannel)
331 xreq_channel_class = Type(XReqSocketChannel)
333 rep_channel_class = Type(RepSocketChannel)
332 rep_channel_class = Type(RepSocketChannel)
334
333
335 # Protected traits.
334 # Protected traits.
336 _kernel = Instance(Popen)
335 _kernel = Instance(Popen)
337 _sub_channel = Any
336 _sub_channel = Any
338 _xreq_channel = Any
337 _xreq_channel = Any
339 _rep_channel = Any
338 _rep_channel = Any
340
339
341 #--------------------------------------------------------------------------
340 #--------------------------------------------------------------------------
342 # Channel management methods:
341 # Channel management methods:
343 #--------------------------------------------------------------------------
342 #--------------------------------------------------------------------------
344
343
345 def start_listening(self):
344 def start_listening(self):
346 """Starts listening on the specified ports. If already listening, raises
345 """Starts listening on the specified ports. If already listening, raises
347 a RuntimeError.
346 a RuntimeError.
348 """
347 """
349 if self.is_listening:
348 if self.is_listening:
350 raise RuntimeError("Cannot start listening. Already listening!")
349 raise RuntimeError("Cannot start listening. Already listening!")
351 else:
350 else:
352 self.is_listening = True
351 self.is_listening = True
353 self.sub_channel.start()
352 self.sub_channel.start()
354 self.xreq_channel.start()
353 self.xreq_channel.start()
355 self.rep_channel.start()
354 self.rep_channel.start()
356
355
357 @property
356 @property
358 def is_alive(self):
357 def is_alive(self):
359 """ Returns whether the kernel is alive. """
358 """ Returns whether the kernel is alive. """
360 if self.is_listening:
359 if self.is_listening:
361 # TODO: check if alive.
360 # TODO: check if alive.
362 return True
361 return True
363 else:
362 else:
364 return False
363 return False
365
364
366 def stop_listening(self):
365 def stop_listening(self):
367 """Stops listening. If not listening, does nothing. """
366 """Stops listening. If not listening, does nothing. """
368 if self.is_listening:
367 if self.is_listening:
369 self.is_listening = False
368 self.is_listening = False
370 self.sub_channel.stop()
369 self.sub_channel.stop()
371 self.xreq_channel.stop()
370 self.xreq_channel.stop()
372 self.rep_channel.stop()
371 self.rep_channel.stop()
373
372
374 #--------------------------------------------------------------------------
373 #--------------------------------------------------------------------------
375 # Kernel process management methods:
374 # Kernel process management methods:
376 #--------------------------------------------------------------------------
375 #--------------------------------------------------------------------------
377
376
378 def start_kernel(self):
377 def start_kernel(self):
379 """Starts a kernel process and configures the manager to use it.
378 """Starts a kernel process and configures the manager to use it.
380
379
381 If ports have been specified via the address attributes, they are used.
380 If ports have been specified via the address attributes, they are used.
382 Otherwise, open ports are chosen by the OS and the channel port
381 Otherwise, open ports are chosen by the OS and the channel port
383 attributes are configured as appropriate.
382 attributes are configured as appropriate.
384 """
383 """
385 xreq, sub = self.xreq_address, self.sub_address
384 xreq, sub = self.xreq_address, self.sub_address
386 if xreq[0] != LOCALHOST or sub[0] != LOCALHOST:
385 if xreq[0] != LOCALHOST or sub[0] != LOCALHOST:
387 raise RuntimeError("Can only launch a kernel on localhost."
386 raise RuntimeError("Can only launch a kernel on localhost."
388 "Make sure that the '*_address' attributes are "
387 "Make sure that the '*_address' attributes are "
389 "configured properly.")
388 "configured properly.")
390
389
391 kernel, xrep, pub = launch_kernel(xrep_port=xreq[1], pub_port=sub[1])
390 kernel, xrep, pub = launch_kernel(xrep_port=xreq[1], pub_port=sub[1])
392 self.set_kernel(kernel)
391 self.set_kernel(kernel)
393 self.xreq_address = (LOCALHOST, xrep)
392 self.xreq_address = (LOCALHOST, xrep)
394 self.sub_address = (LOCALHOST, pub)
393 self.sub_address = (LOCALHOST, pub)
395
394
396 def set_kernel(self, kernel):
395 def set_kernel(self, kernel):
397 """Sets the kernel manager's kernel to an existing kernel process.
396 """Sets the kernel manager's kernel to an existing kernel process.
398
397
399 It is *not* necessary to a set a kernel to communicate with it via the
398 It is *not* necessary to a set a kernel to communicate with it via the
400 channels, and those objects must be configured separately. It
399 channels, and those objects must be configured separately. It
401 *is* necessary to set a kernel if you want to use the manager (or
400 *is* necessary to set a kernel if you want to use the manager (or
402 frontends that use the manager) to signal and/or kill the kernel.
401 frontends that use the manager) to signal and/or kill the kernel.
403
402
404 Parameters:
403 Parameters:
405 -----------
404 -----------
406 kernel : Popen
405 kernel : Popen
407 An existing kernel process.
406 An existing kernel process.
408 """
407 """
409 self._kernel = kernel
408 self._kernel = kernel
410
409
411 @property
410 @property
412 def has_kernel(self):
411 def has_kernel(self):
413 """Returns whether a kernel process has been specified for the kernel
412 """Returns whether a kernel process has been specified for the kernel
414 manager.
413 manager.
415
414
416 A kernel process can be set via 'start_kernel' or 'set_kernel'.
415 A kernel process can be set via 'start_kernel' or 'set_kernel'.
417 """
416 """
418 return self._kernel is not None
417 return self._kernel is not None
419
418
420 def kill_kernel(self):
419 def kill_kernel(self):
421 """ Kill the running kernel. """
420 """ Kill the running kernel. """
422 if self._kernel:
421 if self._kernel:
423 self._kernel.kill()
422 self._kernel.kill()
424 self._kernel = None
423 self._kernel = None
425 else:
424 else:
426 raise RuntimeError("Cannot kill kernel. No kernel is running!")
425 raise RuntimeError("Cannot kill kernel. No kernel is running!")
427
426
428 def signal_kernel(self, signum):
427 def signal_kernel(self, signum):
429 """ Sends a signal to the kernel. """
428 """ Sends a signal to the kernel. """
430 if self._kernel:
429 if self._kernel:
431 self._kernel.send_signal(signum)
430 self._kernel.send_signal(signum)
432 else:
431 else:
433 raise RuntimeError("Cannot signal kernel. No kernel is running!")
432 raise RuntimeError("Cannot signal kernel. No kernel is running!")
434
433
435 #--------------------------------------------------------------------------
434 #--------------------------------------------------------------------------
436 # Channels used for communication with the kernel:
435 # Channels used for communication with the kernel:
437 #--------------------------------------------------------------------------
436 #--------------------------------------------------------------------------
438
437
439 @property
438 @property
440 def sub_channel(self):
439 def sub_channel(self):
441 """Get the SUB socket channel object."""
440 """Get the SUB socket channel object."""
442 if self._sub_channel is None:
441 if self._sub_channel is None:
443 self._sub_channel = self.sub_channel_class(self.context,
442 self._sub_channel = self.sub_channel_class(self.context,
444 self.session)
443 self.session)
445 return self._sub_channel
444 return self._sub_channel
446
445
447 @property
446 @property
448 def xreq_channel(self):
447 def xreq_channel(self):
449 """Get the REQ socket channel object to make requests of the kernel."""
448 """Get the REQ socket channel object to make requests of the kernel."""
450 if self._xreq_channel is None:
449 if self._xreq_channel is None:
451 self._xreq_channel = self.xreq_channel_class(self.context,
450 self._xreq_channel = self.xreq_channel_class(self.context,
452 self.session)
451 self.session)
453 return self._xreq_channel
452 return self._xreq_channel
454
453
455 @property
454 @property
456 def rep_channel(self):
455 def rep_channel(self):
457 """Get the REP socket channel object to handle stdin (raw_input)."""
456 """Get the REP socket channel object to handle stdin (raw_input)."""
458 if self._rep_channel is None:
457 if self._rep_channel is None:
459 self._rep_channel = self.rep_channel_class(self.context,
458 self._rep_channel = self.rep_channel_class(self.context,
460 self.session)
459 self.session)
461 return self._rep_channel
460 return self._rep_channel
462
461
463 #--------------------------------------------------------------------------
462 #--------------------------------------------------------------------------
464 # Delegates for the Channel address attributes:
463 # Delegates for the Channel address attributes:
465 #--------------------------------------------------------------------------
464 #--------------------------------------------------------------------------
466
465
467 def get_sub_address(self):
466 def get_sub_address(self):
468 return self.sub_channel.address
467 return self.sub_channel.address
469
468
470 def set_sub_address(self, address):
469 def set_sub_address(self, address):
471 self.sub_channel.address = address
470 self.sub_channel.address = address
472
471
473 sub_address = property(get_sub_address, set_sub_address,
472 sub_address = property(get_sub_address, set_sub_address,
474 doc="The address used by SUB socket channel.")
473 doc="The address used by SUB socket channel.")
475
474
476 def get_xreq_address(self):
475 def get_xreq_address(self):
477 return self.xreq_channel.address
476 return self.xreq_channel.address
478
477
479 def set_xreq_address(self, address):
478 def set_xreq_address(self, address):
480 self.xreq_channel.address = address
479 self.xreq_channel.address = address
481
480
482 xreq_address = property(get_xreq_address, set_xreq_address,
481 xreq_address = property(get_xreq_address, set_xreq_address,
483 doc="The address used by XREQ socket channel.")
482 doc="The address used by XREQ socket channel.")
484
483
485 def get_rep_address(self):
484 def get_rep_address(self):
486 return self.rep_channel.address
485 return self.rep_channel.address
487
486
488 def set_rep_address(self, address):
487 def set_rep_address(self, address):
489 self.rep_channel.address = address
488 self.rep_channel.address = address
490
489
491 rep_address = property(get_rep_address, set_rep_address,
490 rep_address = property(get_rep_address, set_rep_address,
492 doc="The address used by REP socket channel.")
491 doc="The address used by REP socket channel.")
493
492
General Comments 0
You need to be logged in to leave comments. Login now