##// END OF EJS Templates
SUB channel _handle_recv is now greedy....
Brian Granger -
Show More
@@ -1,458 +1,466 b''
1 """Kernel frontend classes.
1 """Kernel frontend classes.
2
2
3 TODO: Create logger to handle debugging and console messages.
3 TODO: Create logger to handle debugging and console messages.
4
4
5 """
5 """
6
6
7 # Standard library imports.
7 # Standard library imports.
8 from Queue import Queue, Empty
8 from Queue import Queue, Empty
9 from subprocess import Popen
9 from subprocess import Popen
10 from threading import Thread
10 from threading import Thread
11 import time
11 import time
12 import traceback
12 import traceback
13
13
14 # System library imports.
14 # System library imports.
15 import zmq
15 import zmq
16 from zmq import POLLIN, POLLOUT, POLLERR
16 from zmq import POLLIN, POLLOUT, POLLERR
17 from zmq.eventloop import ioloop
17 from zmq.eventloop import ioloop
18
18
19 # Local imports.
19 # Local imports.
20 from IPython.utils.traitlets import HasTraits, Any, Bool, Int, Instance, Str, \
20 from IPython.utils.traitlets import HasTraits, Any, Bool, Int, Instance, Str, \
21 Type
21 Type
22 from kernel import launch_kernel
22 from kernel import launch_kernel
23 from session import Session
23 from session import Session
24
24
25 # Constants.
25 # Constants.
26 LOCALHOST = '127.0.0.1'
26 LOCALHOST = '127.0.0.1'
27
27
28
28
29 class MissingHandlerError(Exception):
29 class MissingHandlerError(Exception):
30 pass
30 pass
31
31
32
32
33 class ZmqSocketChannel(Thread):
33 class ZmqSocketChannel(Thread):
34 """ The base class for the channels that use ZMQ sockets.
34 """ The base class for the channels that use ZMQ sockets.
35 """
35 """
36
36
37 def __init__(self, context, session, address=None):
37 def __init__(self, context, session, address=None):
38 super(ZmqSocketChannel, self).__init__()
38 super(ZmqSocketChannel, self).__init__()
39 self.daemon = True
39 self.daemon = True
40
40
41 self.context = context
41 self.context = context
42 self.session = session
42 self.session = session
43 self.address = address
43 self.address = address
44 self.socket = None
44 self.socket = None
45
45
46 def stop(self):
46 def stop(self):
47 """Stop the thread's activity. Returns when the thread terminates.
47 """Stop the thread's activity. Returns when the thread terminates.
48
48
49 The thread will raise :class:`RuntimeError` if :method:`self.start`
49 The thread will raise :class:`RuntimeError` if :method:`self.start`
50 is called again.
50 is called again.
51 """
51 """
52 self.join()
52 self.join()
53
53
54
54
55 def get_address(self):
55 def get_address(self):
56 """ Get the channel's address. By the default, a channel is on
56 """ Get the channel's address. By the default, a channel is on
57 localhost with no port specified (a negative port number).
57 localhost with no port specified (a negative port number).
58 """
58 """
59 return self._address
59 return self._address
60
60
61 def set_adresss(self, address):
61 def set_adresss(self, address):
62 """ Set the channel's address. Should be a tuple of form:
62 """ Set the channel's address. Should be a tuple of form:
63 (ip address [str], port [int]).
63 (ip address [str], port [int]).
64 or None, in which case the address is reset to its default value.
64 or None, in which case the address is reset to its default value.
65 """
65 """
66 # FIXME: Validate address.
66 # FIXME: Validate address.
67 if self.is_alive(): # This is Thread.is_alive
67 if self.is_alive(): # This is Thread.is_alive
68 raise RuntimeError("Cannot set address on a running channel!")
68 raise RuntimeError("Cannot set address on a running channel!")
69 else:
69 else:
70 if address is None:
70 if address is None:
71 address = (LOCALHOST, 0)
71 address = (LOCALHOST, 0)
72 self._address = address
72 self._address = address
73
73
74 address = property(get_address, set_adresss)
74 address = property(get_address, set_adresss)
75
75
76
76
77 class SubSocketChannel(ZmqSocketChannel):
77 class SubSocketChannel(ZmqSocketChannel):
78
78
79 def __init__(self, context, session, address=None):
79 def __init__(self, context, session, address=None):
80 super(SubSocketChannel, self).__init__(context, session, address)
80 super(SubSocketChannel, self).__init__(context, session, address)
81
81
82 def run(self):
82 def run(self):
83 self.socket = self.context.socket(zmq.SUB)
83 self.socket = self.context.socket(zmq.SUB)
84 self.socket.setsockopt(zmq.SUBSCRIBE,'')
84 self.socket.setsockopt(zmq.SUBSCRIBE,'')
85 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
85 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
86 self.socket.connect('tcp://%s:%i' % self.address)
86 self.socket.connect('tcp://%s:%i' % self.address)
87 self.ioloop = ioloop.IOLoop()
87 self.ioloop = ioloop.IOLoop()
88 self.ioloop.add_handler(self.socket, self._handle_events,
88 self.ioloop.add_handler(self.socket, self._handle_events,
89 POLLIN|POLLERR)
89 POLLIN|POLLERR)
90 self.ioloop.start()
90 self.ioloop.start()
91
91
92 def stop(self):
92 def stop(self):
93 self.ioloop.stop()
93 self.ioloop.stop()
94 super(SubSocketChannel, self).stop()
94 super(SubSocketChannel, self).stop()
95
95
96 def _handle_events(self, socket, events):
96 def _handle_events(self, socket, events):
97 # Turn on and off POLLOUT depending on if we have made a request
97 # Turn on and off POLLOUT depending on if we have made a request
98 if events & POLLERR:
98 if events & POLLERR:
99 self._handle_err()
99 self._handle_err()
100 if events & POLLIN:
100 if events & POLLIN:
101 self._handle_recv()
101 self._handle_recv()
102
102
103 def _handle_err(self):
103 def _handle_err(self):
104 # We don't want to let this go silently, so eventually we should log.
104 # We don't want to let this go silently, so eventually we should log.
105 raise zmq.ZmqError()
105 raise zmq.ZMQError()
106
106
107 def _handle_recv(self):
107 def _handle_recv(self):
108 msg = self.socket.recv_json()
108 # Get all of the messages we can
109 self.call_handlers(msg)
109 while True:
110 try:
111 msg = self.socket.recv_json(zmq.NOBLOCK)
112 except zmq.ZMQError:
113 # Check the errno?
114 # Will this tigger POLLERR?
115 break
116 else:
117 self.call_handlers(msg)
110
118
111 def call_handlers(self, msg):
119 def call_handlers(self, msg):
112 """This method is called in the ioloop thread when a message arrives.
120 """This method is called in the ioloop thread when a message arrives.
113
121
114 Subclasses should override this method to handle incoming messages.
122 Subclasses should override this method to handle incoming messages.
115 It is important to remember that this method is called in the thread
123 It is important to remember that this method is called in the thread
116 so that some logic must be done to ensure that the application leve
124 so that some logic must be done to ensure that the application leve
117 handlers are called in the application thread.
125 handlers are called in the application thread.
118 """
126 """
119 raise NotImplementedError('call_handlers must be defined in a subclass.')
127 raise NotImplementedError('call_handlers must be defined in a subclass.')
120
128
121 def flush(self, timeout=1.0):
129 def flush(self, timeout=1.0):
122 """Immediately processes all pending messages on the SUB channel.
130 """Immediately processes all pending messages on the SUB channel.
123
131
124 This method is thread safe.
132 This method is thread safe.
125
133
126 Parameters
134 Parameters
127 ----------
135 ----------
128 timeout : float, optional
136 timeout : float, optional
129 The maximum amount of time to spend flushing, in seconds. The
137 The maximum amount of time to spend flushing, in seconds. The
130 default is one second.
138 default is one second.
131 """
139 """
132 # We do the IOLoop callback process twice to ensure that the IOLoop
140 # We do the IOLoop callback process twice to ensure that the IOLoop
133 # gets to perform at least one full poll.
141 # gets to perform at least one full poll.
134 stop_time = time.time() + timeout
142 stop_time = time.time() + timeout
135 for i in xrange(2):
143 for i in xrange(2):
136 self._flushed = False
144 self._flushed = False
137 self.ioloop.add_callback(self._flush)
145 self.ioloop.add_callback(self._flush)
138 while not self._flushed and time.time() < stop_time:
146 while not self._flushed and time.time() < stop_time:
139 time.sleep(0.01)
147 time.sleep(0.01)
140
148
141 def _flush(self):
149 def _flush(self):
142 """Called in this thread by the IOLoop to indicate that all events have
150 """Called in this thread by the IOLoop to indicate that all events have
143 been processed.
151 been processed.
144 """
152 """
145 self._flushed = True
153 self._flushed = True
146
154
147
155
148 class XReqSocketChannel(ZmqSocketChannel):
156 class XReqSocketChannel(ZmqSocketChannel):
149
157
150 handler_queue = None
158 handler_queue = None
151 command_queue = None
159 command_queue = None
152 handlers = None
160 handlers = None
153 _overriden_call_handler = None
161 _overriden_call_handler = None
154
162
155 def __init__(self, context, session, address=None):
163 def __init__(self, context, session, address=None):
156 self.handlers = {}
164 self.handlers = {}
157 self.handler_queue = Queue()
165 self.handler_queue = Queue()
158 self.command_queue = Queue()
166 self.command_queue = Queue()
159 super(XReqSocketChannel, self).__init__(context, session, address)
167 super(XReqSocketChannel, self).__init__(context, session, address)
160
168
161 def run(self):
169 def run(self):
162 self.socket = self.context.socket(zmq.XREQ)
170 self.socket = self.context.socket(zmq.XREQ)
163 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
171 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
164 self.socket.connect('tcp://%s:%i' % self.address)
172 self.socket.connect('tcp://%s:%i' % self.address)
165 self.ioloop = ioloop.IOLoop()
173 self.ioloop = ioloop.IOLoop()
166 self.ioloop.add_handler(self.socket, self._handle_events,
174 self.ioloop.add_handler(self.socket, self._handle_events,
167 POLLIN|POLLOUT|POLLERR)
175 POLLIN|POLLOUT|POLLERR)
168 self.ioloop.start()
176 self.ioloop.start()
169
177
170 def stop(self):
178 def stop(self):
171 self.ioloop.stop()
179 self.ioloop.stop()
172 super(XReqSocketChannel, self).stop()
180 super(XReqSocketChannel, self).stop()
173
181
174 def _handle_events(self, socket, events):
182 def _handle_events(self, socket, events):
175 # Turn on and off POLLOUT depending on if we have made a request
183 # Turn on and off POLLOUT depending on if we have made a request
176 if events & POLLERR:
184 if events & POLLERR:
177 self._handle_err()
185 self._handle_err()
178 if events & POLLOUT:
186 if events & POLLOUT:
179 self._handle_send()
187 self._handle_send()
180 if events & POLLIN:
188 if events & POLLIN:
181 self._handle_recv()
189 self._handle_recv()
182
190
183 def _handle_recv(self):
191 def _handle_recv(self):
184 msg = self.socket.recv_json()
192 msg = self.socket.recv_json()
185 self.call_handlers(msg)
193 self.call_handlers(msg)
186
194
187 def _handle_send(self):
195 def _handle_send(self):
188 try:
196 try:
189 msg = self.command_queue.get(False)
197 msg = self.command_queue.get(False)
190 except Empty:
198 except Empty:
191 pass
199 pass
192 else:
200 else:
193 self.socket.send_json(msg)
201 self.socket.send_json(msg)
194
202
195 def _handle_err(self):
203 def _handle_err(self):
196 # We don't want to let this go silently, so eventually we should log.
204 # We don't want to let this go silently, so eventually we should log.
197 raise zmq.ZmqError()
205 raise zmq.ZMQError()
198
206
199 def _queue_request(self, msg, callback):
207 def _queue_request(self, msg, callback):
200 handler = self._find_handler(msg['msg_type'], callback)
208 handler = self._find_handler(msg['msg_type'], callback)
201 self.handler_queue.put(handler)
209 self.handler_queue.put(handler)
202 self.command_queue.put(msg)
210 self.command_queue.put(msg)
203
211
204 def execute(self, code, callback=None):
212 def execute(self, code, callback=None):
205 # Create class for content/msg creation. Related to, but possibly
213 # Create class for content/msg creation. Related to, but possibly
206 # not in Session.
214 # not in Session.
207 content = dict(code=code)
215 content = dict(code=code)
208 msg = self.session.msg('execute_request', content)
216 msg = self.session.msg('execute_request', content)
209 self._queue_request(msg, callback)
217 self._queue_request(msg, callback)
210 return msg['header']['msg_id']
218 return msg['header']['msg_id']
211
219
212 def complete(self, text, line, block=None, callback=None):
220 def complete(self, text, line, block=None, callback=None):
213 content = dict(text=text, line=line)
221 content = dict(text=text, line=line)
214 msg = self.session.msg('complete_request', content)
222 msg = self.session.msg('complete_request', content)
215 self._queue_request(msg, callback)
223 self._queue_request(msg, callback)
216 return msg['header']['msg_id']
224 return msg['header']['msg_id']
217
225
218 def object_info(self, oname, callback=None):
226 def object_info(self, oname, callback=None):
219 content = dict(oname=oname)
227 content = dict(oname=oname)
220 msg = self.session.msg('object_info_request', content)
228 msg = self.session.msg('object_info_request', content)
221 self._queue_request(msg, callback)
229 self._queue_request(msg, callback)
222 return msg['header']['msg_id']
230 return msg['header']['msg_id']
223
231
224 def _find_handler(self, name, callback):
232 def _find_handler(self, name, callback):
225 if callback is not None:
233 if callback is not None:
226 return callback
234 return callback
227 handler = self.handlers.get(name)
235 handler = self.handlers.get(name)
228 if handler is None:
236 if handler is None:
229 raise MissingHandlerError(
237 raise MissingHandlerError(
230 'No handler defined for method: %s' % name)
238 'No handler defined for method: %s' % name)
231 return handler
239 return handler
232
240
233 def override_call_handler(self, func):
241 def override_call_handler(self, func):
234 """Permanently override the call_handler.
242 """Permanently override the call_handler.
235
243
236 The function func will be called as::
244 The function func will be called as::
237
245
238 func(handler, msg)
246 func(handler, msg)
239
247
240 And must call::
248 And must call::
241
249
242 handler(msg)
250 handler(msg)
243
251
244 in the main thread.
252 in the main thread.
245 """
253 """
246 assert callable(func), "not a callable: %r" % func
254 assert callable(func), "not a callable: %r" % func
247 self._overriden_call_handler = func
255 self._overriden_call_handler = func
248
256
249 def call_handlers(self, msg):
257 def call_handlers(self, msg):
250 try:
258 try:
251 handler = self.handler_queue.get(False)
259 handler = self.handler_queue.get(False)
252 except Empty:
260 except Empty:
253 print "Message received with no handler!!!"
261 print "Message received with no handler!!!"
254 print msg
262 print msg
255 else:
263 else:
256 self.call_handler(handler, msg)
264 self.call_handler(handler, msg)
257
265
258 def call_handler(self, handler, msg):
266 def call_handler(self, handler, msg):
259 if self._overriden_call_handler is not None:
267 if self._overriden_call_handler is not None:
260 self._overriden_call_handler(handler, msg)
268 self._overriden_call_handler(handler, msg)
261 elif hasattr(self, '_call_handler'):
269 elif hasattr(self, '_call_handler'):
262 call_handler = getattr(self, '_call_handler')
270 call_handler = getattr(self, '_call_handler')
263 call_handler(handler, msg)
271 call_handler(handler, msg)
264 else:
272 else:
265 raise RuntimeError('no handler!')
273 raise RuntimeError('no handler!')
266
274
267
275
268 class RepSocketChannel(ZmqSocketChannel):
276 class RepSocketChannel(ZmqSocketChannel):
269
277
270 def on_raw_input(self):
278 def on_raw_input(self):
271 pass
279 pass
272
280
273
281
274 class KernelManager(HasTraits):
282 class KernelManager(HasTraits):
275 """ Manages a kernel for a frontend.
283 """ Manages a kernel for a frontend.
276
284
277 The SUB channel is for the frontend to receive messages published by the
285 The SUB channel is for the frontend to receive messages published by the
278 kernel.
286 kernel.
279
287
280 The REQ channel is for the frontend to make requests of the kernel.
288 The REQ channel is for the frontend to make requests of the kernel.
281
289
282 The REP channel is for the kernel to request stdin (raw_input) from the
290 The REP channel is for the kernel to request stdin (raw_input) from the
283 frontend.
291 frontend.
284 """
292 """
285
293
286 # Whether the kernel manager is currently listening on its channels.
294 # Whether the kernel manager is currently listening on its channels.
287 is_listening = Bool(False)
295 is_listening = Bool(False)
288
296
289 # The PyZMQ Context to use for communication with the kernel.
297 # The PyZMQ Context to use for communication with the kernel.
290 context = Instance(zmq.Context, ())
298 context = Instance(zmq.Context, ())
291
299
292 # The Session to use for communication with the kernel.
300 # The Session to use for communication with the kernel.
293 session = Instance(Session, ())
301 session = Instance(Session, ())
294
302
295 # The classes to use for the various channels.
303 # The classes to use for the various channels.
296 sub_channel_class = Type(SubSocketChannel)
304 sub_channel_class = Type(SubSocketChannel)
297 xreq_channel_class = Type(XReqSocketChannel)
305 xreq_channel_class = Type(XReqSocketChannel)
298 rep_channel_class = Type(RepSocketChannel)
306 rep_channel_class = Type(RepSocketChannel)
299
307
300 # Protected traits.
308 # Protected traits.
301 _kernel = Instance(Popen)
309 _kernel = Instance(Popen)
302 _sub_channel = Any
310 _sub_channel = Any
303 _xreq_channel = Any
311 _xreq_channel = Any
304 _rep_channel = Any
312 _rep_channel = Any
305
313
306 #--------------------------------------------------------------------------
314 #--------------------------------------------------------------------------
307 # Channel management methods:
315 # Channel management methods:
308 #--------------------------------------------------------------------------
316 #--------------------------------------------------------------------------
309
317
310 def start_listening(self):
318 def start_listening(self):
311 """Starts listening on the specified ports. If already listening, raises
319 """Starts listening on the specified ports. If already listening, raises
312 a RuntimeError.
320 a RuntimeError.
313 """
321 """
314 if self.is_listening:
322 if self.is_listening:
315 raise RuntimeError("Cannot start listening. Already listening!")
323 raise RuntimeError("Cannot start listening. Already listening!")
316 else:
324 else:
317 self.is_listening = True
325 self.is_listening = True
318 self.sub_channel.start()
326 self.sub_channel.start()
319 self.xreq_channel.start()
327 self.xreq_channel.start()
320 self.rep_channel.start()
328 self.rep_channel.start()
321
329
322 @property
330 @property
323 def is_alive(self):
331 def is_alive(self):
324 """ Returns whether the kernel is alive. """
332 """ Returns whether the kernel is alive. """
325 if self.is_listening:
333 if self.is_listening:
326 # TODO: check if alive.
334 # TODO: check if alive.
327 return True
335 return True
328 else:
336 else:
329 return False
337 return False
330
338
331 def stop_listening(self):
339 def stop_listening(self):
332 """Stops listening. If not listening, does nothing. """
340 """Stops listening. If not listening, does nothing. """
333 if self.is_listening:
341 if self.is_listening:
334 self.is_listening = False
342 self.is_listening = False
335 self.sub_channel.stop()
343 self.sub_channel.stop()
336 self.xreq_channel.stop()
344 self.xreq_channel.stop()
337 self.rep_channel.stop()
345 self.rep_channel.stop()
338
346
339 #--------------------------------------------------------------------------
347 #--------------------------------------------------------------------------
340 # Kernel process management methods:
348 # Kernel process management methods:
341 #--------------------------------------------------------------------------
349 #--------------------------------------------------------------------------
342
350
343 def start_kernel(self):
351 def start_kernel(self):
344 """Starts a kernel process and configures the manager to use it.
352 """Starts a kernel process and configures the manager to use it.
345
353
346 If ports have been specified via the address attributes, they are used.
354 If ports have been specified via the address attributes, they are used.
347 Otherwise, open ports are chosen by the OS and the channel port
355 Otherwise, open ports are chosen by the OS and the channel port
348 attributes are configured as appropriate.
356 attributes are configured as appropriate.
349 """
357 """
350 xreq, sub = self.xreq_address, self.sub_address
358 xreq, sub = self.xreq_address, self.sub_address
351 if xreq[0] != LOCALHOST or sub[0] != LOCALHOST:
359 if xreq[0] != LOCALHOST or sub[0] != LOCALHOST:
352 raise RuntimeError("Can only launch a kernel on localhost."
360 raise RuntimeError("Can only launch a kernel on localhost."
353 "Make sure that the '*_address' attributes are "
361 "Make sure that the '*_address' attributes are "
354 "configured properly.")
362 "configured properly.")
355
363
356 kernel, xrep, pub = launch_kernel(xrep_port=xreq[1], pub_port=sub[1])
364 kernel, xrep, pub = launch_kernel(xrep_port=xreq[1], pub_port=sub[1])
357 self.set_kernel(kernel)
365 self.set_kernel(kernel)
358 self.xreq_address = (LOCALHOST, xrep)
366 self.xreq_address = (LOCALHOST, xrep)
359 self.sub_address = (LOCALHOST, pub)
367 self.sub_address = (LOCALHOST, pub)
360
368
361 def set_kernel(self, kernel):
369 def set_kernel(self, kernel):
362 """Sets the kernel manager's kernel to an existing kernel process.
370 """Sets the kernel manager's kernel to an existing kernel process.
363
371
364 It is *not* necessary to a set a kernel to communicate with it via the
372 It is *not* necessary to a set a kernel to communicate with it via the
365 channels, and those objects must be configured separately. It
373 channels, and those objects must be configured separately. It
366 *is* necessary to set a kernel if you want to use the manager (or
374 *is* necessary to set a kernel if you want to use the manager (or
367 frontends that use the manager) to signal and/or kill the kernel.
375 frontends that use the manager) to signal and/or kill the kernel.
368
376
369 Parameters:
377 Parameters:
370 -----------
378 -----------
371 kernel : Popen
379 kernel : Popen
372 An existing kernel process.
380 An existing kernel process.
373 """
381 """
374 self._kernel = kernel
382 self._kernel = kernel
375
383
376 @property
384 @property
377 def has_kernel(self):
385 def has_kernel(self):
378 """Returns whether a kernel process has been specified for the kernel
386 """Returns whether a kernel process has been specified for the kernel
379 manager.
387 manager.
380
388
381 A kernel process can be set via 'start_kernel' or 'set_kernel'.
389 A kernel process can be set via 'start_kernel' or 'set_kernel'.
382 """
390 """
383 return self._kernel is not None
391 return self._kernel is not None
384
392
385 def kill_kernel(self):
393 def kill_kernel(self):
386 """ Kill the running kernel. """
394 """ Kill the running kernel. """
387 if self._kernel:
395 if self._kernel:
388 self._kernel.kill()
396 self._kernel.kill()
389 self._kernel = None
397 self._kernel = None
390 else:
398 else:
391 raise RuntimeError("Cannot kill kernel. No kernel is running!")
399 raise RuntimeError("Cannot kill kernel. No kernel is running!")
392
400
393 def signal_kernel(self, signum):
401 def signal_kernel(self, signum):
394 """ Sends a signal to the kernel. """
402 """ Sends a signal to the kernel. """
395 if self._kernel:
403 if self._kernel:
396 self._kernel.send_signal(signum)
404 self._kernel.send_signal(signum)
397 else:
405 else:
398 raise RuntimeError("Cannot signal kernel. No kernel is running!")
406 raise RuntimeError("Cannot signal kernel. No kernel is running!")
399
407
400 #--------------------------------------------------------------------------
408 #--------------------------------------------------------------------------
401 # Channels used for communication with the kernel:
409 # Channels used for communication with the kernel:
402 #--------------------------------------------------------------------------
410 #--------------------------------------------------------------------------
403
411
404 @property
412 @property
405 def sub_channel(self):
413 def sub_channel(self):
406 """Get the SUB socket channel object."""
414 """Get the SUB socket channel object."""
407 if self._sub_channel is None:
415 if self._sub_channel is None:
408 self._sub_channel = self.sub_channel_class(self.context,
416 self._sub_channel = self.sub_channel_class(self.context,
409 self.session)
417 self.session)
410 return self._sub_channel
418 return self._sub_channel
411
419
412 @property
420 @property
413 def xreq_channel(self):
421 def xreq_channel(self):
414 """Get the REQ socket channel object to make requests of the kernel."""
422 """Get the REQ socket channel object to make requests of the kernel."""
415 if self._xreq_channel is None:
423 if self._xreq_channel is None:
416 self._xreq_channel = self.xreq_channel_class(self.context,
424 self._xreq_channel = self.xreq_channel_class(self.context,
417 self.session)
425 self.session)
418 return self._xreq_channel
426 return self._xreq_channel
419
427
420 @property
428 @property
421 def rep_channel(self):
429 def rep_channel(self):
422 """Get the REP socket channel object to handle stdin (raw_input)."""
430 """Get the REP socket channel object to handle stdin (raw_input)."""
423 if self._rep_channel is None:
431 if self._rep_channel is None:
424 self._rep_channel = self.rep_channel_class(self.context,
432 self._rep_channel = self.rep_channel_class(self.context,
425 self.session)
433 self.session)
426 return self._rep_channel
434 return self._rep_channel
427
435
428 #--------------------------------------------------------------------------
436 #--------------------------------------------------------------------------
429 # Delegates for the Channel address attributes:
437 # Delegates for the Channel address attributes:
430 #--------------------------------------------------------------------------
438 #--------------------------------------------------------------------------
431
439
432 def get_sub_address(self):
440 def get_sub_address(self):
433 return self.sub_channel.address
441 return self.sub_channel.address
434
442
435 def set_sub_address(self, address):
443 def set_sub_address(self, address):
436 self.sub_channel.address = address
444 self.sub_channel.address = address
437
445
438 sub_address = property(get_sub_address, set_sub_address,
446 sub_address = property(get_sub_address, set_sub_address,
439 doc="The address used by SUB socket channel.")
447 doc="The address used by SUB socket channel.")
440
448
441 def get_xreq_address(self):
449 def get_xreq_address(self):
442 return self.xreq_channel.address
450 return self.xreq_channel.address
443
451
444 def set_xreq_address(self, address):
452 def set_xreq_address(self, address):
445 self.xreq_channel.address = address
453 self.xreq_channel.address = address
446
454
447 xreq_address = property(get_xreq_address, set_xreq_address,
455 xreq_address = property(get_xreq_address, set_xreq_address,
448 doc="The address used by XREQ socket channel.")
456 doc="The address used by XREQ socket channel.")
449
457
450 def get_rep_address(self):
458 def get_rep_address(self):
451 return self.rep_channel.address
459 return self.rep_channel.address
452
460
453 def set_rep_address(self, address):
461 def set_rep_address(self, address):
454 self.rep_channel.address = address
462 self.rep_channel.address = address
455
463
456 rep_address = property(get_rep_address, set_rep_address,
464 rep_address = property(get_rep_address, set_rep_address,
457 doc="The address used by REP socket channel.")
465 doc="The address used by REP socket channel.")
458
466
General Comments 0
You need to be logged in to leave comments. Login now