##// END OF EJS Templates
General cleanup of kernelmanager.py....
Brian Granger -
Show More
@@ -1,492 +1,458 b''
1 """Kernel frontend classes.
1 """Kernel frontend classes.
2
2
3 TODO: Create logger to handle debugging and console messages.
3 TODO: Create logger to handle debugging and console messages.
4
4
5 """
5 """
6
6
7 # Standard library imports.
7 # Standard library imports.
8 from Queue import Queue, Empty
8 from Queue import Queue, Empty
9 from subprocess import Popen
9 from subprocess import Popen
10 from threading import Thread
10 from threading import Thread
11 import time
11 import time
12 import traceback
12 import traceback
13
13
14 # System library imports.
14 # System library imports.
15 import zmq
15 import zmq
16 from zmq import POLLIN, POLLOUT, POLLERR
16 from zmq import POLLIN, POLLOUT, POLLERR
17 from zmq.eventloop import ioloop
17 from zmq.eventloop import ioloop
18
18
19 # Local imports.
19 # Local imports.
20 from IPython.utils.traitlets import HasTraits, Any, Bool, Int, Instance, Str, \
20 from IPython.utils.traitlets import HasTraits, Any, Bool, Int, Instance, Str, \
21 Type
21 Type
22 from kernel import launch_kernel
22 from kernel import launch_kernel
23 from session import Session
23 from session import Session
24
24
25 # Constants.
25 # Constants.
26 LOCALHOST = '127.0.0.1'
26 LOCALHOST = '127.0.0.1'
27
27
28
28
29 class MissingHandlerError(Exception):
29 class MissingHandlerError(Exception):
30 pass
30 pass
31
31
32
32
33 class ZmqSocketChannel(Thread):
33 class ZmqSocketChannel(Thread):
34 """ The base class for the channels that use ZMQ sockets.
34 """ The base class for the channels that use ZMQ sockets.
35 """
35 """
36
36
37 def __init__(self, context, session, address=None):
37 def __init__(self, context, session, address=None):
38 super(ZmqSocketChannel, self).__init__()
38 super(ZmqSocketChannel, self).__init__()
39 self.daemon = True
39 self.daemon = True
40
40
41 self.context = context
41 self.context = context
42 self.session = session
42 self.session = session
43 self.address = address
43 self.address = address
44 self.socket = None
44 self.socket = None
45
45
46 def stop(self):
46 def stop(self):
47 """Stop the thread's activity. Returns when the thread terminates.
47 """Stop the thread's activity. Returns when the thread terminates.
48
48
49 The thread will raise :class:`RuntimeError` if :method:`self.start`
49 The thread will raise :class:`RuntimeError` if :method:`self.start`
50 is called again.
50 is called again.
51 """
51 """
52 self.join()
52 self.join()
53
53
54
54
55 def get_address(self):
55 def get_address(self):
56 """ Get the channel's address. By the default, a channel is on
56 """ Get the channel's address. By the default, a channel is on
57 localhost with no port specified (a negative port number).
57 localhost with no port specified (a negative port number).
58 """
58 """
59 return self._address
59 return self._address
60
60
61 def set_adresss(self, address):
61 def set_adresss(self, address):
62 """ Set the channel's address. Should be a tuple of form:
62 """ Set the channel's address. Should be a tuple of form:
63 (ip address [str], port [int]).
63 (ip address [str], port [int]).
64 or None, in which case the address is reset to its default value.
64 or None, in which case the address is reset to its default value.
65 """
65 """
66 # FIXME: Validate address.
66 # FIXME: Validate address.
67 if self.is_alive(): # This is Thread.is_alive
67 if self.is_alive(): # This is Thread.is_alive
68 raise RuntimeError("Cannot set address on a running channel!")
68 raise RuntimeError("Cannot set address on a running channel!")
69 else:
69 else:
70 if address is None:
70 if address is None:
71 address = (LOCALHOST, 0)
71 address = (LOCALHOST, 0)
72 self._address = address
72 self._address = address
73
73
74 address = property(get_address, set_adresss)
74 address = property(get_address, set_adresss)
75
75
76
76
77 class SubSocketChannel(ZmqSocketChannel):
77 class SubSocketChannel(ZmqSocketChannel):
78
78
79 handlers = None
80 _overriden_call_handler = None
81
82 def __init__(self, context, session, address=None):
79 def __init__(self, context, session, address=None):
83 self.handlers = {}
84 super(SubSocketChannel, self).__init__(context, session, address)
80 super(SubSocketChannel, self).__init__(context, session, address)
85
81
86 def run(self):
82 def run(self):
87 self.socket = self.context.socket(zmq.SUB)
83 self.socket = self.context.socket(zmq.SUB)
88 self.socket.setsockopt(zmq.SUBSCRIBE,'')
84 self.socket.setsockopt(zmq.SUBSCRIBE,'')
89 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
85 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
90 self.socket.connect('tcp://%s:%i' % self.address)
86 self.socket.connect('tcp://%s:%i' % self.address)
91 self.ioloop = ioloop.IOLoop()
87 self.ioloop = ioloop.IOLoop()
92 self.ioloop.add_handler(self.socket, self._handle_events,
88 self.ioloop.add_handler(self.socket, self._handle_events,
93 POLLIN|POLLERR)
89 POLLIN|POLLERR)
94 self.ioloop.start()
90 self.ioloop.start()
95
91
96 def stop(self):
92 def stop(self):
97 self.ioloop.stop()
93 self.ioloop.stop()
98 super(SubSocketChannel, self).stop()
94 super(SubSocketChannel, self).stop()
99
95
100 def _handle_events(self, socket, events):
96 def _handle_events(self, socket, events):
101 # Turn on and off POLLOUT depending on if we have made a request
97 # Turn on and off POLLOUT depending on if we have made a request
102 if events & POLLERR:
98 if events & POLLERR:
103 self._handle_err()
99 self._handle_err()
104 if events & POLLIN:
100 if events & POLLIN:
105 self._handle_recv()
101 self._handle_recv()
106
102
107 def _handle_err(self):
103 def _handle_err(self):
104 # We don't want to let this go silently, so eventually we should log.
108 raise zmq.ZmqError()
105 raise zmq.ZmqError()
109
106
110 def _handle_recv(self):
107 def _handle_recv(self):
111 msg = self.socket.recv_json()
108 msg = self.socket.recv_json()
112 self.call_handlers(msg)
109 self.call_handlers(msg)
113
110
114 def override_call_handler(self, func):
115 """Permanently override the call_handler.
116
117 The function func will be called as::
118
119 func(handler, msg)
120
121 And must call::
122
123 handler(msg)
124
125 in the main thread.
126 """
127 assert callable(func), "not a callable: %r" % func
128 self._overriden_call_handler = func
129
130 def call_handlers(self, msg):
111 def call_handlers(self, msg):
131 handler = self.handlers.get(msg['msg_type'], None)
112 """This method is called in the ioloop thread when a message arrives.
132 if handler is not None:
133 try:
134 self.call_handler(handler, msg)
135 except:
136 # XXX: This should be logged at least
137 traceback.print_last()
138
113
139 def call_handler(self, handler, msg):
114 Subclasses should override this method to handle incoming messages.
140 if self._overriden_call_handler is not None:
115 It is important to remember that this method is called in the thread
141 self._overriden_call_handler(handler, msg)
116 so that some logic must be done to ensure that the application leve
142 elif hasattr(self, '_call_handler'):
117 handlers are called in the application thread.
143 call_handler = getattr(self, '_call_handler')
118 """
144 call_handler(handler, msg)
119 raise NotImplementedError('call_handlers must be defined in a subclass.')
145 else:
146 raise RuntimeError('no handler!')
147
148 def add_handler(self, callback, msg_type):
149 """Register a callback for msg type."""
150 self.handlers[msg_type] = callback
151
152 def remove_handler(self, msg_type):
153 """Remove the callback for msg type."""
154 self.handlers.pop(msg_type, None)
155
120
156 def flush(self, timeout=1.0):
121 def flush(self, timeout=1.0):
157 """Immediately processes all pending messages on the SUB channel.
122 """Immediately processes all pending messages on the SUB channel.
158
123
159 This method is thread safe.
124 This method is thread safe.
160
125
161 Parameters
126 Parameters
162 ----------
127 ----------
163 timeout : float, optional
128 timeout : float, optional
164 The maximum amount of time to spend flushing, in seconds. The
129 The maximum amount of time to spend flushing, in seconds. The
165 default is one second.
130 default is one second.
166 """
131 """
167 # We do the IOLoop callback process twice to ensure that the IOLoop
132 # We do the IOLoop callback process twice to ensure that the IOLoop
168 # gets to perform at least one full poll.
133 # gets to perform at least one full poll.
169 stop_time = time.time() + timeout
134 stop_time = time.time() + timeout
170 for i in xrange(2):
135 for i in xrange(2):
171 self._flushed = False
136 self._flushed = False
172 self.ioloop.add_callback(self._flush)
137 self.ioloop.add_callback(self._flush)
173 while not self._flushed and time.time() < stop_time:
138 while not self._flushed and time.time() < stop_time:
174 time.sleep(0.01)
139 time.sleep(0.01)
175
140
176 def _flush(self):
141 def _flush(self):
177 """Called in this thread by the IOLoop to indicate that all events have
142 """Called in this thread by the IOLoop to indicate that all events have
178 been processed.
143 been processed.
179 """
144 """
180 self._flushed = True
145 self._flushed = True
181
146
182
147
183 class XReqSocketChannel(ZmqSocketChannel):
148 class XReqSocketChannel(ZmqSocketChannel):
184
149
185 handler_queue = None
150 handler_queue = None
186 command_queue = None
151 command_queue = None
187 handlers = None
152 handlers = None
188 _overriden_call_handler = None
153 _overriden_call_handler = None
189
154
190 def __init__(self, context, session, address=None):
155 def __init__(self, context, session, address=None):
191 self.handlers = {}
156 self.handlers = {}
192 self.handler_queue = Queue()
157 self.handler_queue = Queue()
193 self.command_queue = Queue()
158 self.command_queue = Queue()
194 super(XReqSocketChannel, self).__init__(context, session, address)
159 super(XReqSocketChannel, self).__init__(context, session, address)
195
160
196 def run(self):
161 def run(self):
197 self.socket = self.context.socket(zmq.XREQ)
162 self.socket = self.context.socket(zmq.XREQ)
198 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
163 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
199 self.socket.connect('tcp://%s:%i' % self.address)
164 self.socket.connect('tcp://%s:%i' % self.address)
200 self.ioloop = ioloop.IOLoop()
165 self.ioloop = ioloop.IOLoop()
201 self.ioloop.add_handler(self.socket, self._handle_events,
166 self.ioloop.add_handler(self.socket, self._handle_events,
202 POLLIN|POLLOUT|POLLERR)
167 POLLIN|POLLOUT|POLLERR)
203 self.ioloop.start()
168 self.ioloop.start()
204
169
205 def stop(self):
170 def stop(self):
206 self.ioloop.stop()
171 self.ioloop.stop()
207 super(XReqSocketChannel, self).stop()
172 super(XReqSocketChannel, self).stop()
208
173
209 def _handle_events(self, socket, events):
174 def _handle_events(self, socket, events):
210 # Turn on and off POLLOUT depending on if we have made a request
175 # Turn on and off POLLOUT depending on if we have made a request
211 if events & POLLERR:
176 if events & POLLERR:
212 self._handle_err()
177 self._handle_err()
213 if events & POLLOUT:
178 if events & POLLOUT:
214 self._handle_send()
179 self._handle_send()
215 if events & POLLIN:
180 if events & POLLIN:
216 self._handle_recv()
181 self._handle_recv()
217
182
218 def _handle_recv(self):
183 def _handle_recv(self):
219 msg = self.socket.recv_json()
184 msg = self.socket.recv_json()
220 self.call_handlers(msg)
185 self.call_handlers(msg)
221
186
222 def _handle_send(self):
187 def _handle_send(self):
223 try:
188 try:
224 msg = self.command_queue.get(False)
189 msg = self.command_queue.get(False)
225 except Empty:
190 except Empty:
226 pass
191 pass
227 else:
192 else:
228 self.socket.send_json(msg)
193 self.socket.send_json(msg)
229
194
230 def _handle_err(self):
195 def _handle_err(self):
196 # We don't want to let this go silently, so eventually we should log.
231 raise zmq.ZmqError()
197 raise zmq.ZmqError()
232
198
233 def _queue_request(self, msg, callback):
199 def _queue_request(self, msg, callback):
234 handler = self._find_handler(msg['msg_type'], callback)
200 handler = self._find_handler(msg['msg_type'], callback)
235 self.handler_queue.put(handler)
201 self.handler_queue.put(handler)
236 self.command_queue.put(msg)
202 self.command_queue.put(msg)
237
203
238 def execute(self, code, callback=None):
204 def execute(self, code, callback=None):
239 # Create class for content/msg creation. Related to, but possibly
205 # Create class for content/msg creation. Related to, but possibly
240 # not in Session.
206 # not in Session.
241 content = dict(code=code)
207 content = dict(code=code)
242 msg = self.session.msg('execute_request', content)
208 msg = self.session.msg('execute_request', content)
243 self._queue_request(msg, callback)
209 self._queue_request(msg, callback)
244 return msg['header']['msg_id']
210 return msg['header']['msg_id']
245
211
246 def complete(self, text, line, block=None, callback=None):
212 def complete(self, text, line, block=None, callback=None):
247 content = dict(text=text, line=line)
213 content = dict(text=text, line=line)
248 msg = self.session.msg('complete_request', content)
214 msg = self.session.msg('complete_request', content)
249 self._queue_request(msg, callback)
215 self._queue_request(msg, callback)
250 return msg['header']['msg_id']
216 return msg['header']['msg_id']
251
217
252 def object_info(self, oname, callback=None):
218 def object_info(self, oname, callback=None):
253 content = dict(oname=oname)
219 content = dict(oname=oname)
254 msg = self.session.msg('object_info_request', content)
220 msg = self.session.msg('object_info_request', content)
255 self._queue_request(msg, callback)
221 self._queue_request(msg, callback)
256 return msg['header']['msg_id']
222 return msg['header']['msg_id']
257
223
258 def _find_handler(self, name, callback):
224 def _find_handler(self, name, callback):
259 if callback is not None:
225 if callback is not None:
260 return callback
226 return callback
261 handler = self.handlers.get(name)
227 handler = self.handlers.get(name)
262 if handler is None:
228 if handler is None:
263 raise MissingHandlerError(
229 raise MissingHandlerError(
264 'No handler defined for method: %s' % name)
230 'No handler defined for method: %s' % name)
265 return handler
231 return handler
266
232
267 def override_call_handler(self, func):
233 def override_call_handler(self, func):
268 """Permanently override the call_handler.
234 """Permanently override the call_handler.
269
235
270 The function func will be called as::
236 The function func will be called as::
271
237
272 func(handler, msg)
238 func(handler, msg)
273
239
274 And must call::
240 And must call::
275
241
276 handler(msg)
242 handler(msg)
277
243
278 in the main thread.
244 in the main thread.
279 """
245 """
280 assert callable(func), "not a callable: %r" % func
246 assert callable(func), "not a callable: %r" % func
281 self._overriden_call_handler = func
247 self._overriden_call_handler = func
282
248
283 def call_handlers(self, msg):
249 def call_handlers(self, msg):
284 try:
250 try:
285 handler = self.handler_queue.get(False)
251 handler = self.handler_queue.get(False)
286 except Empty:
252 except Empty:
287 print "Message received with no handler!!!"
253 print "Message received with no handler!!!"
288 print msg
254 print msg
289 else:
255 else:
290 self.call_handler(handler, msg)
256 self.call_handler(handler, msg)
291
257
292 def call_handler(self, handler, msg):
258 def call_handler(self, handler, msg):
293 if self._overriden_call_handler is not None:
259 if self._overriden_call_handler is not None:
294 self._overriden_call_handler(handler, msg)
260 self._overriden_call_handler(handler, msg)
295 elif hasattr(self, '_call_handler'):
261 elif hasattr(self, '_call_handler'):
296 call_handler = getattr(self, '_call_handler')
262 call_handler = getattr(self, '_call_handler')
297 call_handler(handler, msg)
263 call_handler(handler, msg)
298 else:
264 else:
299 raise RuntimeError('no handler!')
265 raise RuntimeError('no handler!')
300
266
301
267
302 class RepSocketChannel(ZmqSocketChannel):
268 class RepSocketChannel(ZmqSocketChannel):
303
269
304 def on_raw_input(self):
270 def on_raw_input(self):
305 pass
271 pass
306
272
307
273
308 class KernelManager(HasTraits):
274 class KernelManager(HasTraits):
309 """ Manages a kernel for a frontend.
275 """ Manages a kernel for a frontend.
310
276
311 The SUB channel is for the frontend to receive messages published by the
277 The SUB channel is for the frontend to receive messages published by the
312 kernel.
278 kernel.
313
279
314 The REQ channel is for the frontend to make requests of the kernel.
280 The REQ channel is for the frontend to make requests of the kernel.
315
281
316 The REP channel is for the kernel to request stdin (raw_input) from the
282 The REP channel is for the kernel to request stdin (raw_input) from the
317 frontend.
283 frontend.
318 """
284 """
319
285
320 # Whether the kernel manager is currently listening on its channels.
286 # Whether the kernel manager is currently listening on its channels.
321 is_listening = Bool(False)
287 is_listening = Bool(False)
322
288
323 # The PyZMQ Context to use for communication with the kernel.
289 # The PyZMQ Context to use for communication with the kernel.
324 context = Instance(zmq.Context, ())
290 context = Instance(zmq.Context, ())
325
291
326 # The Session to use for communication with the kernel.
292 # The Session to use for communication with the kernel.
327 session = Instance(Session, ())
293 session = Instance(Session, ())
328
294
329 # The classes to use for the various channels.
295 # The classes to use for the various channels.
330 sub_channel_class = Type(SubSocketChannel)
296 sub_channel_class = Type(SubSocketChannel)
331 xreq_channel_class = Type(XReqSocketChannel)
297 xreq_channel_class = Type(XReqSocketChannel)
332 rep_channel_class = Type(RepSocketChannel)
298 rep_channel_class = Type(RepSocketChannel)
333
299
334 # Protected traits.
300 # Protected traits.
335 _kernel = Instance(Popen)
301 _kernel = Instance(Popen)
336 _sub_channel = Any
302 _sub_channel = Any
337 _xreq_channel = Any
303 _xreq_channel = Any
338 _rep_channel = Any
304 _rep_channel = Any
339
305
340 #--------------------------------------------------------------------------
306 #--------------------------------------------------------------------------
341 # Channel management methods:
307 # Channel management methods:
342 #--------------------------------------------------------------------------
308 #--------------------------------------------------------------------------
343
309
344 def start_listening(self):
310 def start_listening(self):
345 """Starts listening on the specified ports. If already listening, raises
311 """Starts listening on the specified ports. If already listening, raises
346 a RuntimeError.
312 a RuntimeError.
347 """
313 """
348 if self.is_listening:
314 if self.is_listening:
349 raise RuntimeError("Cannot start listening. Already listening!")
315 raise RuntimeError("Cannot start listening. Already listening!")
350 else:
316 else:
351 self.is_listening = True
317 self.is_listening = True
352 self.sub_channel.start()
318 self.sub_channel.start()
353 self.xreq_channel.start()
319 self.xreq_channel.start()
354 self.rep_channel.start()
320 self.rep_channel.start()
355
321
356 @property
322 @property
357 def is_alive(self):
323 def is_alive(self):
358 """ Returns whether the kernel is alive. """
324 """ Returns whether the kernel is alive. """
359 if self.is_listening:
325 if self.is_listening:
360 # TODO: check if alive.
326 # TODO: check if alive.
361 return True
327 return True
362 else:
328 else:
363 return False
329 return False
364
330
365 def stop_listening(self):
331 def stop_listening(self):
366 """Stops listening. If not listening, does nothing. """
332 """Stops listening. If not listening, does nothing. """
367 if self.is_listening:
333 if self.is_listening:
368 self.is_listening = False
334 self.is_listening = False
369 self.sub_channel.stop()
335 self.sub_channel.stop()
370 self.xreq_channel.stop()
336 self.xreq_channel.stop()
371 self.rep_channel.stop()
337 self.rep_channel.stop()
372
338
373 #--------------------------------------------------------------------------
339 #--------------------------------------------------------------------------
374 # Kernel process management methods:
340 # Kernel process management methods:
375 #--------------------------------------------------------------------------
341 #--------------------------------------------------------------------------
376
342
377 def start_kernel(self):
343 def start_kernel(self):
378 """Starts a kernel process and configures the manager to use it.
344 """Starts a kernel process and configures the manager to use it.
379
345
380 If ports have been specified via the address attributes, they are used.
346 If ports have been specified via the address attributes, they are used.
381 Otherwise, open ports are chosen by the OS and the channel port
347 Otherwise, open ports are chosen by the OS and the channel port
382 attributes are configured as appropriate.
348 attributes are configured as appropriate.
383 """
349 """
384 xreq, sub = self.xreq_address, self.sub_address
350 xreq, sub = self.xreq_address, self.sub_address
385 if xreq[0] != LOCALHOST or sub[0] != LOCALHOST:
351 if xreq[0] != LOCALHOST or sub[0] != LOCALHOST:
386 raise RuntimeError("Can only launch a kernel on localhost."
352 raise RuntimeError("Can only launch a kernel on localhost."
387 "Make sure that the '*_address' attributes are "
353 "Make sure that the '*_address' attributes are "
388 "configured properly.")
354 "configured properly.")
389
355
390 kernel, xrep, pub = launch_kernel(xrep_port=xreq[1], pub_port=sub[1])
356 kernel, xrep, pub = launch_kernel(xrep_port=xreq[1], pub_port=sub[1])
391 self.set_kernel(kernel)
357 self.set_kernel(kernel)
392 self.xreq_address = (LOCALHOST, xrep)
358 self.xreq_address = (LOCALHOST, xrep)
393 self.sub_address = (LOCALHOST, pub)
359 self.sub_address = (LOCALHOST, pub)
394
360
395 def set_kernel(self, kernel):
361 def set_kernel(self, kernel):
396 """Sets the kernel manager's kernel to an existing kernel process.
362 """Sets the kernel manager's kernel to an existing kernel process.
397
363
398 It is *not* necessary to a set a kernel to communicate with it via the
364 It is *not* necessary to a set a kernel to communicate with it via the
399 channels, and those objects must be configured separately. It
365 channels, and those objects must be configured separately. It
400 *is* necessary to set a kernel if you want to use the manager (or
366 *is* necessary to set a kernel if you want to use the manager (or
401 frontends that use the manager) to signal and/or kill the kernel.
367 frontends that use the manager) to signal and/or kill the kernel.
402
368
403 Parameters:
369 Parameters:
404 -----------
370 -----------
405 kernel : Popen
371 kernel : Popen
406 An existing kernel process.
372 An existing kernel process.
407 """
373 """
408 self._kernel = kernel
374 self._kernel = kernel
409
375
410 @property
376 @property
411 def has_kernel(self):
377 def has_kernel(self):
412 """Returns whether a kernel process has been specified for the kernel
378 """Returns whether a kernel process has been specified for the kernel
413 manager.
379 manager.
414
380
415 A kernel process can be set via 'start_kernel' or 'set_kernel'.
381 A kernel process can be set via 'start_kernel' or 'set_kernel'.
416 """
382 """
417 return self._kernel is not None
383 return self._kernel is not None
418
384
419 def kill_kernel(self):
385 def kill_kernel(self):
420 """ Kill the running kernel. """
386 """ Kill the running kernel. """
421 if self._kernel:
387 if self._kernel:
422 self._kernel.kill()
388 self._kernel.kill()
423 self._kernel = None
389 self._kernel = None
424 else:
390 else:
425 raise RuntimeError("Cannot kill kernel. No kernel is running!")
391 raise RuntimeError("Cannot kill kernel. No kernel is running!")
426
392
427 def signal_kernel(self, signum):
393 def signal_kernel(self, signum):
428 """ Sends a signal to the kernel. """
394 """ Sends a signal to the kernel. """
429 if self._kernel:
395 if self._kernel:
430 self._kernel.send_signal(signum)
396 self._kernel.send_signal(signum)
431 else:
397 else:
432 raise RuntimeError("Cannot signal kernel. No kernel is running!")
398 raise RuntimeError("Cannot signal kernel. No kernel is running!")
433
399
434 #--------------------------------------------------------------------------
400 #--------------------------------------------------------------------------
435 # Channels used for communication with the kernel:
401 # Channels used for communication with the kernel:
436 #--------------------------------------------------------------------------
402 #--------------------------------------------------------------------------
437
403
438 @property
404 @property
439 def sub_channel(self):
405 def sub_channel(self):
440 """Get the SUB socket channel object."""
406 """Get the SUB socket channel object."""
441 if self._sub_channel is None:
407 if self._sub_channel is None:
442 self._sub_channel = self.sub_channel_class(self.context,
408 self._sub_channel = self.sub_channel_class(self.context,
443 self.session)
409 self.session)
444 return self._sub_channel
410 return self._sub_channel
445
411
446 @property
412 @property
447 def xreq_channel(self):
413 def xreq_channel(self):
448 """Get the REQ socket channel object to make requests of the kernel."""
414 """Get the REQ socket channel object to make requests of the kernel."""
449 if self._xreq_channel is None:
415 if self._xreq_channel is None:
450 self._xreq_channel = self.xreq_channel_class(self.context,
416 self._xreq_channel = self.xreq_channel_class(self.context,
451 self.session)
417 self.session)
452 return self._xreq_channel
418 return self._xreq_channel
453
419
454 @property
420 @property
455 def rep_channel(self):
421 def rep_channel(self):
456 """Get the REP socket channel object to handle stdin (raw_input)."""
422 """Get the REP socket channel object to handle stdin (raw_input)."""
457 if self._rep_channel is None:
423 if self._rep_channel is None:
458 self._rep_channel = self.rep_channel_class(self.context,
424 self._rep_channel = self.rep_channel_class(self.context,
459 self.session)
425 self.session)
460 return self._rep_channel
426 return self._rep_channel
461
427
462 #--------------------------------------------------------------------------
428 #--------------------------------------------------------------------------
463 # Delegates for the Channel address attributes:
429 # Delegates for the Channel address attributes:
464 #--------------------------------------------------------------------------
430 #--------------------------------------------------------------------------
465
431
466 def get_sub_address(self):
432 def get_sub_address(self):
467 return self.sub_channel.address
433 return self.sub_channel.address
468
434
469 def set_sub_address(self, address):
435 def set_sub_address(self, address):
470 self.sub_channel.address = address
436 self.sub_channel.address = address
471
437
472 sub_address = property(get_sub_address, set_sub_address,
438 sub_address = property(get_sub_address, set_sub_address,
473 doc="The address used by SUB socket channel.")
439 doc="The address used by SUB socket channel.")
474
440
475 def get_xreq_address(self):
441 def get_xreq_address(self):
476 return self.xreq_channel.address
442 return self.xreq_channel.address
477
443
478 def set_xreq_address(self, address):
444 def set_xreq_address(self, address):
479 self.xreq_channel.address = address
445 self.xreq_channel.address = address
480
446
481 xreq_address = property(get_xreq_address, set_xreq_address,
447 xreq_address = property(get_xreq_address, set_xreq_address,
482 doc="The address used by XREQ socket channel.")
448 doc="The address used by XREQ socket channel.")
483
449
484 def get_rep_address(self):
450 def get_rep_address(self):
485 return self.rep_channel.address
451 return self.rep_channel.address
486
452
487 def set_rep_address(self, address):
453 def set_rep_address(self, address):
488 self.rep_channel.address = address
454 self.rep_channel.address = address
489
455
490 rep_address = property(get_rep_address, set_rep_address,
456 rep_address = property(get_rep_address, set_rep_address,
491 doc="The address used by REP socket channel.")
457 doc="The address used by REP socket channel.")
492
458
General Comments 0
You need to be logged in to leave comments. Login now