Show More
@@ -28,14 +28,37 b' class ZmqSocketChannel(Thread):' | |||
|
28 | 28 | """ The base class for the channels that use ZMQ sockets. |
|
29 | 29 | """ |
|
30 | 30 | |
|
31 | def __init__(self, context, session, addr=None): | |
|
31 | def __init__(self, context, session, address=None): | |
|
32 | super(ZmqSocketChannel, self).__init__() | |
|
33 | self.daemon = True | |
|
34 | ||
|
32 | 35 | self.context = context |
|
33 | 36 | self.session = session |
|
34 | self.addr = addr | |
|
37 | self.address = address | |
|
35 | 38 | self.socket = None |
|
36 | 39 | |
|
37 | super(ZmqSocketChannel, self).__init__() | |
|
38 | self.daemon = True | |
|
40 | def stop(self): | |
|
41 | """ Stop the thread's activity. Returns when the thread terminates. | |
|
42 | """ | |
|
43 | raise NotImplementedError | |
|
44 | ||
|
45 | def get_address(self): | |
|
46 | """ Get the channel's address. | |
|
47 | """ | |
|
48 | return self._address | |
|
49 | ||
|
50 | def set_adresss(self, address): | |
|
51 | """ Set the channel's address. Should be a tuple of form: | |
|
52 | (ip address [str], port [int]) | |
|
53 | or 'None' to indicate that no address has been specified. | |
|
54 | """ | |
|
55 | # FIXME: Validate address. | |
|
56 | if self.is_alive(): | |
|
57 | raise RuntimeError("Cannot set address on a running channel!") | |
|
58 | else: | |
|
59 | self._address = address | |
|
60 | ||
|
61 | address = property(get_address, set_adresss) | |
|
39 | 62 | |
|
40 | 63 | |
|
41 | 64 | class SubSocketChannel(ZmqSocketChannel): |
@@ -43,20 +66,24 b' class SubSocketChannel(ZmqSocketChannel):' | |||
|
43 | 66 | handlers = None |
|
44 | 67 | _overriden_call_handler = None |
|
45 | 68 | |
|
46 | def __init__(self, context, session, addr=None): | |
|
69 | def __init__(self, context, session, address=None): | |
|
47 | 70 | self.handlers = {} |
|
48 | super(SubSocketChannel, self).__init__(context, session, addr) | |
|
71 | super(SubSocketChannel, self).__init__(context, session, address) | |
|
49 | 72 | |
|
50 | 73 | def run(self): |
|
51 | 74 | self.socket = self.context.socket(zmq.SUB) |
|
52 | 75 | self.socket.setsockopt(zmq.SUBSCRIBE,'') |
|
53 | 76 | self.socket.setsockopt(zmq.IDENTITY, self.session.session) |
|
54 | self.socket.connect('tcp://%s:%i' % self.addr) | |
|
77 | self.socket.connect('tcp://%s:%i' % self.address) | |
|
55 | 78 | self.ioloop = ioloop.IOLoop() |
|
56 | 79 | self.ioloop.add_handler(self.socket, self._handle_events, |
|
57 | 80 | POLLIN|POLLERR) |
|
58 | 81 | self.ioloop.start() |
|
59 | 82 | |
|
83 | def stop(self): | |
|
84 | self.ioloop.stop() | |
|
85 | self.join() | |
|
86 | ||
|
60 | 87 | def _handle_events(self, socket, events): |
|
61 | 88 | # Turn on and off POLLOUT depending on if we have made a request |
|
62 | 89 | if events & POLLERR: |
@@ -136,21 +163,25 b' class XReqSocketChannel(ZmqSocketChannel):' | |||
|
136 | 163 | handlers = None |
|
137 | 164 | _overriden_call_handler = None |
|
138 | 165 | |
|
139 | def __init__(self, context, session, addr=None): | |
|
166 | def __init__(self, context, session, address=None): | |
|
140 | 167 | self.handlers = {} |
|
141 | 168 | self.handler_queue = Queue() |
|
142 | 169 | self.command_queue = Queue() |
|
143 | super(XReqSocketChannel, self).__init__(context, session, addr) | |
|
170 | super(XReqSocketChannel, self).__init__(context, session, address) | |
|
144 | 171 | |
|
145 | 172 | def run(self): |
|
146 | 173 | self.socket = self.context.socket(zmq.XREQ) |
|
147 | 174 | self.socket.setsockopt(zmq.IDENTITY, self.session.session) |
|
148 | self.socket.connect('tcp://%s:%i' % self.addr) | |
|
175 | self.socket.connect('tcp://%s:%i' % self.address) | |
|
149 | 176 | self.ioloop = ioloop.IOLoop() |
|
150 | 177 | self.ioloop.add_handler(self.socket, self._handle_events, |
|
151 | 178 | POLLIN|POLLOUT|POLLERR) |
|
152 | 179 | self.ioloop.start() |
|
153 | 180 | |
|
181 | def stop(self): | |
|
182 | self.ioloop.stop() | |
|
183 | self.join() | |
|
184 | ||
|
154 | 185 | def _handle_events(self, socket, events): |
|
155 | 186 | # Turn on and off POLLOUT depending on if we have made a request |
|
156 | 187 | if events & POLLERR: |
@@ -246,7 +277,10 b' class XReqSocketChannel(ZmqSocketChannel):' | |||
|
246 | 277 | |
|
247 | 278 | class RepSocketChannel(ZmqSocketChannel): |
|
248 | 279 | |
|
249 |
def o |
|
|
280 | def stop(self): | |
|
281 | pass | |
|
282 | ||
|
283 | def on_raw_input(self): | |
|
250 | 284 | pass |
|
251 | 285 | |
|
252 | 286 | |
@@ -268,22 +302,10 b' class KernelManager(HasTraits):' | |||
|
268 | 302 | # The Session to use for communication with the kernel. |
|
269 | 303 | session = Instance(Session, ()) |
|
270 | 304 | |
|
271 | # The channels objects used for communication with the kernel. | |
|
272 | # FIXME: Add '_traitname_default' instantiation method to Traitlets. | |
|
273 | #sub_channel = Instance(SubSocketChannel) | |
|
274 | #xreq_channel = Instance(XReqSocketChannel) | |
|
275 | #rep_channel = Instance(RepSocketChannel) | |
|
276 | ||
|
277 | 305 | # The classes to use for the various channels. |
|
278 | 306 | sub_channel_class = Type(SubSocketChannel) |
|
279 | 307 | xreq_channel_class = Type(XReqSocketChannel) |
|
280 | 308 | rep_channel_class = Type(RepSocketChannel) |
|
281 | ||
|
282 | # The addresses to use for the various channels. Should be tuples of form | |
|
283 | # (ip_address, port). | |
|
284 | #sub_address = DelegatesTo('sub_channel') | |
|
285 | #xreq_address = DelegatesTo('xreq_channel') | |
|
286 | #rep_address = DelegatesTo('rep_channel') | |
|
287 | 309 | |
|
288 | 310 | # Protected traits. |
|
289 | 311 | _sub_channel = Any |
@@ -302,9 +324,16 b' class KernelManager(HasTraits):' | |||
|
302 | 324 | """Start a localhost kernel. If ports have been specified, use |
|
303 | 325 | them. Otherwise, choose an open port at random. |
|
304 | 326 | """ |
|
327 | self.sub_channel.start() | |
|
328 | self.xreq_channel.start() | |
|
329 | self.rep_channel.start() | |
|
305 | 330 | |
|
306 | 331 | def kill_kernel(self): |
|
307 |
"""Kill the running kernel |
|
|
332 | """Kill the running kernel. | |
|
333 | """ | |
|
334 | self.sub_channel.stop() | |
|
335 | self.xreq_channel.stop() | |
|
336 | self.rep_channel.stop() | |
|
308 | 337 | |
|
309 | 338 | def is_alive(self): |
|
310 | 339 | """Is the kernel alive?""" |
@@ -313,6 +342,10 b' class KernelManager(HasTraits):' | |||
|
313 | 342 | def signal_kernel(self, signum): |
|
314 | 343 | """Send signum to the kernel.""" |
|
315 | 344 | |
|
345 | #-------------------------------------------------------------------------- | |
|
346 | # Channels used for communication with the kernel: | |
|
347 | #-------------------------------------------------------------------------- | |
|
348 | ||
|
316 | 349 | @property |
|
317 | 350 | def sub_channel(self): |
|
318 | 351 | """Get the SUB socket channel object.""" |
@@ -337,23 +370,34 b' class KernelManager(HasTraits):' | |||
|
337 | 370 | self.session) |
|
338 | 371 | return self._rep_channel |
|
339 | 372 | |
|
373 | #-------------------------------------------------------------------------- | |
|
374 | # Channel addresses: | |
|
375 | #-------------------------------------------------------------------------- | |
|
376 | ||
|
340 | 377 | def get_sub_address(self): |
|
341 | return self.sub_channel.addr | |
|
342 | def set_sub_address(self, addr): | |
|
343 | self.sub_channel.addr = addr | |
|
378 | return self.sub_channel.address | |
|
379 | ||
|
380 | def set_sub_address(self, address): | |
|
381 | self.sub_channel.address = address | |
|
382 | ||
|
344 | 383 | sub_address = property(get_sub_address, set_sub_address, |
|
345 | 384 | doc="The address used by SUB socket channel.") |
|
346 | 385 | |
|
347 | 386 | def get_xreq_address(self): |
|
348 | return self.xreq_channel.addr | |
|
349 | def set_xreq_address(self, addr): | |
|
350 | self.xreq_channel.addr = addr | |
|
387 | return self.xreq_channel.address | |
|
388 | ||
|
389 | def set_xreq_address(self, address): | |
|
390 | self.xreq_channel.address = address | |
|
391 | ||
|
351 | 392 | xreq_address = property(get_xreq_address, set_xreq_address, |
|
352 | 393 | doc="The address used by XREQ socket channel.") |
|
353 | 394 | |
|
354 | 395 | def get_rep_address(self): |
|
355 | return self.rep_channel.addr | |
|
356 | def set_rep_address(self, addr): | |
|
357 | self.rep_channel.addr = addr | |
|
396 | return self.rep_channel.address | |
|
397 | ||
|
398 | def set_rep_address(self, address): | |
|
399 | self.rep_channel.address = address | |
|
400 | ||
|
358 | 401 | rep_address = property(get_rep_address, set_rep_address, |
|
359 | 402 | doc="The address used by REP socket channel.") |
|
403 |
General Comments 0
You need to be logged in to leave comments.
Login now