##// END OF EJS Templates
* Implemented KernelManager's 'signal_kernel' method....
epatters -
Show More
@@ -1,462 +1,493 b''
1 """Kernel frontend classes.
1 """Kernel frontend classes.
2
2
3 TODO: Create logger to handle debugging and console messages.
3 TODO: Create logger to handle debugging and console messages.
4
4
5 """
5 """
6
6
7 # Standard library imports.
7 # Standard library imports.
8 from Queue import Queue, Empty
8 from Queue import Queue, Empty
9 from subprocess import Popen
9 from subprocess import Popen
10 from threading import Thread
10 from threading import Thread
11 import time
11 import time
12 import traceback
12 import traceback
13
13
14 # System library imports.
14 # System library imports.
15 import zmq
15 import zmq
16 from zmq import POLLIN, POLLOUT, POLLERR
16 from zmq import POLLIN, POLLOUT, POLLERR
17 from zmq.eventloop import ioloop
17 from zmq.eventloop import ioloop
18
18
19 # Local imports.
19 # Local imports.
20 from IPython.utils.traitlets import HasTraits, Any, Bool, Int, Instance, Str, \
20 from IPython.utils.traitlets import HasTraits, Any, Bool, Int, Instance, Str, \
21 Type
21 Type
22 from kernel import launch_kernel
22 from kernel import launch_kernel
23 from session import Session
23 from session import Session
24
24
25 # Constants.
25 # Constants.
26 LOCALHOST = '127.0.0.1'
26 LOCALHOST = '127.0.0.1'
27
27
28
28
29 class MissingHandlerError(Exception):
29 class MissingHandlerError(Exception):
30 pass
30 pass
31
31
32
32
33 class ZmqSocketChannel(Thread):
33 class ZmqSocketChannel(Thread):
34 """ The base class for the channels that use ZMQ sockets.
34 """ The base class for the channels that use ZMQ sockets.
35 """
35 """
36
36
37 def __init__(self, context, session, address=None):
37 def __init__(self, context, session, address=None):
38 super(ZmqSocketChannel, self).__init__()
38 super(ZmqSocketChannel, self).__init__()
39 self.daemon = True
39 self.daemon = True
40
40
41 self.context = context
41 self.context = context
42 self.session = session
42 self.session = session
43 self.address = address
43 self.address = address
44 self.socket = None
44 self.socket = None
45
45
46 def stop(self):
46 def stop(self):
47 """ Stop the thread's activity. Returns when the thread terminates.
47 """ Stop the thread's activity. Returns when the thread terminates.
48 """
48 """
49 self.join()
49 self.join()
50
50
51 # Allow the thread to be started again.
51 # Allow the thread to be started again.
52 # FIXME: Although this works (and there's no reason why it shouldn't),
52 # FIXME: Although this works (and there's no reason why it shouldn't),
53 # it feels wrong. Is there a cleaner way to achieve this?
53 # it feels wrong. Is there a cleaner way to achieve this?
54 Thread.__init__(self)
54 Thread.__init__(self)
55
55
56 def get_address(self):
56 def get_address(self):
57 """ Get the channel's address. By the default, a channel is on
57 """ Get the channel's address. By the default, a channel is on
58 localhost with no port specified (a negative port number).
58 localhost with no port specified (a negative port number).
59 """
59 """
60 return self._address
60 return self._address
61
61
62 def set_adresss(self, address):
62 def set_adresss(self, address):
63 """ Set the channel's address. Should be a tuple of form:
63 """ Set the channel's address. Should be a tuple of form:
64 (ip address [str], port [int]).
64 (ip address [str], port [int]).
65 or None, in which case the address is reset to its default value.
65 or None, in which case the address is reset to its default value.
66 """
66 """
67 # FIXME: Validate address.
67 # FIXME: Validate address.
68 if self.is_alive():
68 if self.is_alive():
69 raise RuntimeError("Cannot set address on a running channel!")
69 raise RuntimeError("Cannot set address on a running channel!")
70 else:
70 else:
71 if address is None:
71 if address is None:
72 address = (LOCALHOST, -1)
72 address = (LOCALHOST, -1)
73 self._address = address
73 self._address = address
74
74
75 address = property(get_address, set_adresss)
75 address = property(get_address, set_adresss)
76
76
77
77
78 class SubSocketChannel(ZmqSocketChannel):
78 class SubSocketChannel(ZmqSocketChannel):
79
79
80 handlers = None
80 handlers = None
81 _overriden_call_handler = None
81 _overriden_call_handler = None
82
82
83 def __init__(self, context, session, address=None):
83 def __init__(self, context, session, address=None):
84 self.handlers = {}
84 self.handlers = {}
85 super(SubSocketChannel, self).__init__(context, session, address)
85 super(SubSocketChannel, self).__init__(context, session, address)
86
86
87 def run(self):
87 def run(self):
88 self.socket = self.context.socket(zmq.SUB)
88 self.socket = self.context.socket(zmq.SUB)
89 self.socket.setsockopt(zmq.SUBSCRIBE,'')
89 self.socket.setsockopt(zmq.SUBSCRIBE,'')
90 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
90 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
91 self.socket.connect('tcp://%s:%i' % self.address)
91 self.socket.connect('tcp://%s:%i' % self.address)
92 self.ioloop = ioloop.IOLoop()
92 self.ioloop = ioloop.IOLoop()
93 self.ioloop.add_handler(self.socket, self._handle_events,
93 self.ioloop.add_handler(self.socket, self._handle_events,
94 POLLIN|POLLERR)
94 POLLIN|POLLERR)
95 self.ioloop.start()
95 self.ioloop.start()
96
96
97 def stop(self):
97 def stop(self):
98 self.ioloop.stop()
98 self.ioloop.stop()
99 super(SubSocketChannel, self).stop()
99 super(SubSocketChannel, self).stop()
100
100
101 def _handle_events(self, socket, events):
101 def _handle_events(self, socket, events):
102 # Turn on and off POLLOUT depending on if we have made a request
102 # Turn on and off POLLOUT depending on if we have made a request
103 if events & POLLERR:
103 if events & POLLERR:
104 self._handle_err()
104 self._handle_err()
105 if events & POLLIN:
105 if events & POLLIN:
106 self._handle_recv()
106 self._handle_recv()
107
107
108 def _handle_err(self):
108 def _handle_err(self):
109 raise zmq.ZmqError()
109 raise zmq.ZmqError()
110
110
111 def _handle_recv(self):
111 def _handle_recv(self):
112 msg = self.socket.recv_json()
112 msg = self.socket.recv_json()
113 self.call_handlers(msg)
113 self.call_handlers(msg)
114
114
115 def override_call_handler(self, func):
115 def override_call_handler(self, func):
116 """Permanently override the call_handler.
116 """Permanently override the call_handler.
117
117
118 The function func will be called as::
118 The function func will be called as::
119
119
120 func(handler, msg)
120 func(handler, msg)
121
121
122 And must call::
122 And must call::
123
123
124 handler(msg)
124 handler(msg)
125
125
126 in the main thread.
126 in the main thread.
127 """
127 """
128 assert callable(func), "not a callable: %r" % func
128 assert callable(func), "not a callable: %r" % func
129 self._overriden_call_handler = func
129 self._overriden_call_handler = func
130
130
131 def call_handlers(self, msg):
131 def call_handlers(self, msg):
132 handler = self.handlers.get(msg['msg_type'], None)
132 handler = self.handlers.get(msg['msg_type'], None)
133 if handler is not None:
133 if handler is not None:
134 try:
134 try:
135 self.call_handler(handler, msg)
135 self.call_handler(handler, msg)
136 except:
136 except:
137 # XXX: This should be logged at least
137 # XXX: This should be logged at least
138 traceback.print_last()
138 traceback.print_last()
139
139
140 def call_handler(self, handler, msg):
140 def call_handler(self, handler, msg):
141 if self._overriden_call_handler is not None:
141 if self._overriden_call_handler is not None:
142 self._overriden_call_handler(handler, msg)
142 self._overriden_call_handler(handler, msg)
143 elif hasattr(self, '_call_handler'):
143 elif hasattr(self, '_call_handler'):
144 call_handler = getattr(self, '_call_handler')
144 call_handler = getattr(self, '_call_handler')
145 call_handler(handler, msg)
145 call_handler(handler, msg)
146 else:
146 else:
147 raise RuntimeError('no handler!')
147 raise RuntimeError('no handler!')
148
148
149 def add_handler(self, callback, msg_type):
149 def add_handler(self, callback, msg_type):
150 """Register a callback for msg type."""
150 """Register a callback for msg type."""
151 self.handlers[msg_type] = callback
151 self.handlers[msg_type] = callback
152
152
153 def remove_handler(self, msg_type):
153 def remove_handler(self, msg_type):
154 """Remove the callback for msg type."""
154 """Remove the callback for msg type."""
155 self.handlers.pop(msg_type, None)
155 self.handlers.pop(msg_type, None)
156
156
157 def flush(self, timeout=1.0):
157 def flush(self, timeout=1.0):
158 """Immediately processes all pending messages on the SUB channel.
158 """Immediately processes all pending messages on the SUB channel.
159
159
160 This method is thread safe.
160 This method is thread safe.
161
161
162 Parameters
162 Parameters
163 ----------
163 ----------
164 timeout : float, optional
164 timeout : float, optional
165 The maximum amount of time to spend flushing, in seconds. The
165 The maximum amount of time to spend flushing, in seconds. The
166 default is one second.
166 default is one second.
167 """
167 """
168 # We do the IOLoop callback process twice to ensure that the IOLoop
168 # We do the IOLoop callback process twice to ensure that the IOLoop
169 # gets to perform at least one full poll.
169 # gets to perform at least one full poll.
170 stop_time = time.time() + timeout
170 stop_time = time.time() + timeout
171 for i in xrange(2):
171 for i in xrange(2):
172 self._flushed = False
172 self._flushed = False
173 self.ioloop.add_callback(self._flush)
173 self.ioloop.add_callback(self._flush)
174 while not self._flushed and time.time() < stop_time:
174 while not self._flushed and time.time() < stop_time:
175 time.sleep(0.01)
175 time.sleep(0.01)
176
176
177 def _flush(self):
177 def _flush(self):
178 """Called in this thread by the IOLoop to indicate that all events have
178 """Called in this thread by the IOLoop to indicate that all events have
179 been processed.
179 been processed.
180 """
180 """
181 self._flushed = True
181 self._flushed = True
182
182
183
183
184 class XReqSocketChannel(ZmqSocketChannel):
184 class XReqSocketChannel(ZmqSocketChannel):
185
185
186 handler_queue = None
186 handler_queue = None
187 command_queue = None
187 command_queue = None
188 handlers = None
188 handlers = None
189 _overriden_call_handler = None
189 _overriden_call_handler = None
190
190
191 def __init__(self, context, session, address=None):
191 def __init__(self, context, session, address=None):
192 self.handlers = {}
192 self.handlers = {}
193 self.handler_queue = Queue()
193 self.handler_queue = Queue()
194 self.command_queue = Queue()
194 self.command_queue = Queue()
195 super(XReqSocketChannel, self).__init__(context, session, address)
195 super(XReqSocketChannel, self).__init__(context, session, address)
196
196
197 def run(self):
197 def run(self):
198 self.socket = self.context.socket(zmq.XREQ)
198 self.socket = self.context.socket(zmq.XREQ)
199 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
199 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
200 self.socket.connect('tcp://%s:%i' % self.address)
200 self.socket.connect('tcp://%s:%i' % self.address)
201 self.ioloop = ioloop.IOLoop()
201 self.ioloop = ioloop.IOLoop()
202 self.ioloop.add_handler(self.socket, self._handle_events,
202 self.ioloop.add_handler(self.socket, self._handle_events,
203 POLLIN|POLLOUT|POLLERR)
203 POLLIN|POLLOUT|POLLERR)
204 self.ioloop.start()
204 self.ioloop.start()
205
205
206 def stop(self):
206 def stop(self):
207 self.ioloop.stop()
207 self.ioloop.stop()
208 super(XReqSocketChannel, self).stop()
208 super(XReqSocketChannel, self).stop()
209
209
210 def _handle_events(self, socket, events):
210 def _handle_events(self, socket, events):
211 # Turn on and off POLLOUT depending on if we have made a request
211 # Turn on and off POLLOUT depending on if we have made a request
212 if events & POLLERR:
212 if events & POLLERR:
213 self._handle_err()
213 self._handle_err()
214 if events & POLLOUT:
214 if events & POLLOUT:
215 self._handle_send()
215 self._handle_send()
216 if events & POLLIN:
216 if events & POLLIN:
217 self._handle_recv()
217 self._handle_recv()
218
218
219 def _handle_recv(self):
219 def _handle_recv(self):
220 msg = self.socket.recv_json()
220 msg = self.socket.recv_json()
221 self.call_handlers(msg)
221 self.call_handlers(msg)
222
222
223 def _handle_send(self):
223 def _handle_send(self):
224 try:
224 try:
225 msg = self.command_queue.get(False)
225 msg = self.command_queue.get(False)
226 except Empty:
226 except Empty:
227 pass
227 pass
228 else:
228 else:
229 self.socket.send_json(msg)
229 self.socket.send_json(msg)
230
230
231 def _handle_err(self):
231 def _handle_err(self):
232 raise zmq.ZmqError()
232 raise zmq.ZmqError()
233
233
234 def _queue_request(self, msg, callback):
234 def _queue_request(self, msg, callback):
235 handler = self._find_handler(msg['msg_type'], callback)
235 handler = self._find_handler(msg['msg_type'], callback)
236 self.handler_queue.put(handler)
236 self.handler_queue.put(handler)
237 self.command_queue.put(msg)
237 self.command_queue.put(msg)
238
238
239 def execute(self, code, callback=None):
239 def execute(self, code, callback=None):
240 # Create class for content/msg creation. Related to, but possibly
240 # Create class for content/msg creation. Related to, but possibly
241 # not in Session.
241 # not in Session.
242 content = dict(code=code)
242 content = dict(code=code)
243 msg = self.session.msg('execute_request', content)
243 msg = self.session.msg('execute_request', content)
244 self._queue_request(msg, callback)
244 self._queue_request(msg, callback)
245 return msg['header']['msg_id']
245 return msg['header']['msg_id']
246
246
247 def complete(self, text, line, block=None, callback=None):
247 def complete(self, text, line, block=None, callback=None):
248 content = dict(text=text, line=line)
248 content = dict(text=text, line=line)
249 msg = self.session.msg('complete_request', content)
249 msg = self.session.msg('complete_request', content)
250 self._queue_request(msg, callback)
250 self._queue_request(msg, callback)
251 return msg['header']['msg_id']
251 return msg['header']['msg_id']
252
252
253 def object_info(self, oname, callback=None):
253 def object_info(self, oname, callback=None):
254 content = dict(oname=oname)
254 content = dict(oname=oname)
255 msg = self.session.msg('object_info_request', content)
255 msg = self.session.msg('object_info_request', content)
256 self._queue_request(msg, callback)
256 self._queue_request(msg, callback)
257 return msg['header']['msg_id']
257 return msg['header']['msg_id']
258
258
259 def _find_handler(self, name, callback):
259 def _find_handler(self, name, callback):
260 if callback is not None:
260 if callback is not None:
261 return callback
261 return callback
262 handler = self.handlers.get(name)
262 handler = self.handlers.get(name)
263 if handler is None:
263 if handler is None:
264 raise MissingHandlerError(
264 raise MissingHandlerError(
265 'No handler defined for method: %s' % name)
265 'No handler defined for method: %s' % name)
266 return handler
266 return handler
267
267
268 def override_call_handler(self, func):
268 def override_call_handler(self, func):
269 """Permanently override the call_handler.
269 """Permanently override the call_handler.
270
270
271 The function func will be called as::
271 The function func will be called as::
272
272
273 func(handler, msg)
273 func(handler, msg)
274
274
275 And must call::
275 And must call::
276
276
277 handler(msg)
277 handler(msg)
278
278
279 in the main thread.
279 in the main thread.
280 """
280 """
281 assert callable(func), "not a callable: %r" % func
281 assert callable(func), "not a callable: %r" % func
282 self._overriden_call_handler = func
282 self._overriden_call_handler = func
283
283
284 def call_handlers(self, msg):
284 def call_handlers(self, msg):
285 try:
285 try:
286 handler = self.handler_queue.get(False)
286 handler = self.handler_queue.get(False)
287 except Empty:
287 except Empty:
288 print "Message received with no handler!!!"
288 print "Message received with no handler!!!"
289 print msg
289 print msg
290 else:
290 else:
291 self.call_handler(handler, msg)
291 self.call_handler(handler, msg)
292
292
293 def call_handler(self, handler, msg):
293 def call_handler(self, handler, msg):
294 if self._overriden_call_handler is not None:
294 if self._overriden_call_handler is not None:
295 self._overriden_call_handler(handler, msg)
295 self._overriden_call_handler(handler, msg)
296 elif hasattr(self, '_call_handler'):
296 elif hasattr(self, '_call_handler'):
297 call_handler = getattr(self, '_call_handler')
297 call_handler = getattr(self, '_call_handler')
298 call_handler(handler, msg)
298 call_handler(handler, msg)
299 else:
299 else:
300 raise RuntimeError('no handler!')
300 raise RuntimeError('no handler!')
301
301
302
302
303 class RepSocketChannel(ZmqSocketChannel):
303 class RepSocketChannel(ZmqSocketChannel):
304
304
305 def on_raw_input(self):
305 def on_raw_input(self):
306 pass
306 pass
307
307
308
308
309 class KernelManager(HasTraits):
309 class KernelManager(HasTraits):
310 """ Manages a kernel for a frontend.
310 """ Manages a kernel for a frontend.
311
311
312 The SUB channel is for the frontend to receive messages published by the
312 The SUB channel is for the frontend to receive messages published by the
313 kernel.
313 kernel.
314
314
315 The REQ channel is for the frontend to make requests of the kernel.
315 The REQ channel is for the frontend to make requests of the kernel.
316
316
317 The REP channel is for the kernel to request stdin (raw_input) from the
317 The REP channel is for the kernel to request stdin (raw_input) from the
318 frontend.
318 frontend.
319 """
319 """
320
320
321 # Whether the kernel manager is currently listening on its channels.
321 # Whether the kernel manager is currently listening on its channels.
322 is_listening = Bool(False)
322 is_listening = Bool(False)
323
323
324 # The PyZMQ Context to use for communication with the kernel.
324 # The PyZMQ Context to use for communication with the kernel.
325 context = Instance(zmq.Context, ())
325 context = Instance(zmq.Context, ())
326
326
327 # The Session to use for communication with the kernel.
327 # The Session to use for communication with the kernel.
328 session = Instance(Session, ())
328 session = Instance(Session, ())
329
329
330 # The classes to use for the various channels.
330 # The classes to use for the various channels.
331 sub_channel_class = Type(SubSocketChannel)
331 sub_channel_class = Type(SubSocketChannel)
332 xreq_channel_class = Type(XReqSocketChannel)
332 xreq_channel_class = Type(XReqSocketChannel)
333 rep_channel_class = Type(RepSocketChannel)
333 rep_channel_class = Type(RepSocketChannel)
334
334
335 # Protected traits.
335 # Protected traits.
336 _kernel = Instance(Popen)
336 _kernel = Instance(Popen)
337 _sub_channel = Any
337 _sub_channel = Any
338 _xreq_channel = Any
338 _xreq_channel = Any
339 _rep_channel = Any
339 _rep_channel = Any
340
340
341 def __init__(self, **traits):
341 #--------------------------------------------------------------------------
342 super(KernelManager, self).__init__()
342 # Channel management methods:
343
343 #--------------------------------------------------------------------------
344 # FIXME: This should be the business of HasTraits. The convention is:
345 # HasTraits.__init__(self, **traits_to_be_initialized.)
346 for trait in traits:
347 setattr(self, trait, traits[trait])
348
344
349 def start_listening(self):
345 def start_listening(self):
350 """Start listening on the specified ports. If already listening, raises
346 """Starts listening on the specified ports. If already listening, raises
351 a RuntimeError.
347 a RuntimeError.
352 """
348 """
353 if self.is_listening:
349 if self.is_listening:
354 raise RuntimeError("Cannot start listening. Already listening!")
350 raise RuntimeError("Cannot start listening. Already listening!")
355 else:
351 else:
356 self.is_listening = True
352 self.is_listening = True
357 self.sub_channel.start()
353 self.sub_channel.start()
358 self.xreq_channel.start()
354 self.xreq_channel.start()
359 self.rep_channel.start()
355 self.rep_channel.start()
360
356
357 @property
358 def is_alive(self):
359 """ Returns whether the kernel is alive. """
360 if self.is_listening:
361 # TODO: check if alive.
362 return True
363 else:
364 return False
365
361 def stop_listening(self):
366 def stop_listening(self):
362 """Stop listening. If not listening, does nothing. """
367 """Stops listening. If not listening, does nothing. """
363 if self.is_listening:
368 if self.is_listening:
364 self.is_listening = False
369 self.is_listening = False
365 self.sub_channel.stop()
370 self.sub_channel.stop()
366 self.xreq_channel.stop()
371 self.xreq_channel.stop()
367 self.rep_channel.stop()
372 self.rep_channel.stop()
368
373
374 #--------------------------------------------------------------------------
375 # Kernel process management methods:
376 #--------------------------------------------------------------------------
377
369 def start_kernel(self):
378 def start_kernel(self):
370 """Start a localhost kernel. If ports have been specified via the
379 """Starts a kernel process and configures the manager to use it.
371 address attributes, use them. Otherwise, choose open ports at random.
380
381 If ports have been specified via the address attributes, they are used.
382 Otherwise, open ports are chosen by the OS and the channel port
383 attributes are configured as appropriate.
372 """
384 """
373 xreq, sub = self.xreq_address, self.sub_address
385 xreq, sub = self.xreq_address, self.sub_address
374 if xreq[0] != LOCALHOST or sub[0] != LOCALHOST:
386 if xreq[0] != LOCALHOST or sub[0] != LOCALHOST:
375 raise RuntimeError("Can only launch a kernel on localhost."
387 raise RuntimeError("Can only launch a kernel on localhost."
376 "Make sure that the '*_address' attributes are "
388 "Make sure that the '*_address' attributes are "
377 "configured properly.")
389 "configured properly.")
378
390
379 self._kernel, xrep, pub = launch_kernel(xrep_port=xreq[1],
391 kernel, xrep, pub = launch_kernel(xrep_port=xreq[1], pub_port=sub[1])
380 pub_port=sub[1])
392 self.set_kernel(kernel)
381 self.xreq_address = (LOCALHOST, xrep)
393 self.xreq_address = (LOCALHOST, xrep)
382 self.sub_address = (LOCALHOST, pub)
394 self.sub_address = (LOCALHOST, pub)
383
395
384 def kill_kernel(self):
396 def set_kernel(self, kernel):
385 """Kill the running kernel, if there is one.
397 """Sets the kernel manager's kernel to an existing kernel process.
398
399 It is *not* necessary to a set a kernel to communicate with it via the
400 channels, and those objects must be configured separately. It
401 *is* necessary to set a kernel if you want to use the manager (or
402 frontends that use the manager) to signal and/or kill the kernel.
403
404 Parameters:
405 -----------
406 kernel : Popen
407 An existing kernel process.
408 """
409 self._kernel = kernel
410
411 @property
412 def has_kernel(self):
413 """Returns whether a kernel process has been specified for the kernel
414 manager.
415
416 A kernel process can be set via 'start_kernel' or 'set_kernel'.
386 """
417 """
418 return self._kernel is not None
419
420 def kill_kernel(self):
421 """ Kill the running kernel. """
387 if self._kernel:
422 if self._kernel:
388 self._kernel.kill()
423 self._kernel.kill()
389 self._kernel = None
424 self._kernel = None
390
391 @property
392 def is_alive(self):
393 """ Returns whether the kernel is alive. """
394 if self.is_listening:
395 # TODO: check if alive.
396 return True
397 else:
425 else:
398 return False
426 raise RuntimeError("Cannot kill kernel. No kernel is running!")
399
427
400 def signal_kernel(self, signum):
428 def signal_kernel(self, signum):
401 """Send signum to the kernel."""
429 """ Sends a signal to the kernel. """
402 # TODO: signal the kernel.
430 if self._kernel:
431 self._kernel.send_signal(signum)
432 else:
433 raise RuntimeError("Cannot signal kernel. No kernel is running!")
403
434
404 #--------------------------------------------------------------------------
435 #--------------------------------------------------------------------------
405 # Channels used for communication with the kernel:
436 # Channels used for communication with the kernel:
406 #--------------------------------------------------------------------------
437 #--------------------------------------------------------------------------
407
438
408 @property
439 @property
409 def sub_channel(self):
440 def sub_channel(self):
410 """Get the SUB socket channel object."""
441 """Get the SUB socket channel object."""
411 if self._sub_channel is None:
442 if self._sub_channel is None:
412 self._sub_channel = self.sub_channel_class(self.context,
443 self._sub_channel = self.sub_channel_class(self.context,
413 self.session)
444 self.session)
414 return self._sub_channel
445 return self._sub_channel
415
446
416 @property
447 @property
417 def xreq_channel(self):
448 def xreq_channel(self):
418 """Get the REQ socket channel object to make requests of the kernel."""
449 """Get the REQ socket channel object to make requests of the kernel."""
419 if self._xreq_channel is None:
450 if self._xreq_channel is None:
420 self._xreq_channel = self.xreq_channel_class(self.context,
451 self._xreq_channel = self.xreq_channel_class(self.context,
421 self.session)
452 self.session)
422 return self._xreq_channel
453 return self._xreq_channel
423
454
424 @property
455 @property
425 def rep_channel(self):
456 def rep_channel(self):
426 """Get the REP socket channel object to handle stdin (raw_input)."""
457 """Get the REP socket channel object to handle stdin (raw_input)."""
427 if self._rep_channel is None:
458 if self._rep_channel is None:
428 self._rep_channel = self.rep_channel_class(self.context,
459 self._rep_channel = self.rep_channel_class(self.context,
429 self.session)
460 self.session)
430 return self._rep_channel
461 return self._rep_channel
431
462
432 #--------------------------------------------------------------------------
463 #--------------------------------------------------------------------------
433 # Channel address attributes:
464 # Delegates for the Channel address attributes:
434 #--------------------------------------------------------------------------
465 #--------------------------------------------------------------------------
435
466
436 def get_sub_address(self):
467 def get_sub_address(self):
437 return self.sub_channel.address
468 return self.sub_channel.address
438
469
439 def set_sub_address(self, address):
470 def set_sub_address(self, address):
440 self.sub_channel.address = address
471 self.sub_channel.address = address
441
472
442 sub_address = property(get_sub_address, set_sub_address,
473 sub_address = property(get_sub_address, set_sub_address,
443 doc="The address used by SUB socket channel.")
474 doc="The address used by SUB socket channel.")
444
475
445 def get_xreq_address(self):
476 def get_xreq_address(self):
446 return self.xreq_channel.address
477 return self.xreq_channel.address
447
478
448 def set_xreq_address(self, address):
479 def set_xreq_address(self, address):
449 self.xreq_channel.address = address
480 self.xreq_channel.address = address
450
481
451 xreq_address = property(get_xreq_address, set_xreq_address,
482 xreq_address = property(get_xreq_address, set_xreq_address,
452 doc="The address used by XREQ socket channel.")
483 doc="The address used by XREQ socket channel.")
453
484
454 def get_rep_address(self):
485 def get_rep_address(self):
455 return self.rep_channel.address
486 return self.rep_channel.address
456
487
457 def set_rep_address(self, address):
488 def set_rep_address(self, address):
458 self.rep_channel.address = address
489 self.rep_channel.address = address
459
490
460 rep_address = property(get_rep_address, set_rep_address,
491 rep_address = property(get_rep_address, set_rep_address,
461 doc="The address used by REP socket channel.")
492 doc="The address used by REP socket channel.")
462
493
General Comments 0
You need to be logged in to leave comments. Login now