##// END OF EJS Templates
* Added 'stop' methods to the ZmqSocketChannels...
epatters -
Show More
@@ -28,14 +28,37 b' class ZmqSocketChannel(Thread):'
28 28 """ The base class for the channels that use ZMQ sockets.
29 29 """
30 30
31 def __init__(self, context, session, addr=None):
31 def __init__(self, context, session, address=None):
32 super(ZmqSocketChannel, self).__init__()
33 self.daemon = True
34
32 35 self.context = context
33 36 self.session = session
34 self.addr = addr
37 self.address = address
35 38 self.socket = None
36 39
37 super(ZmqSocketChannel, self).__init__()
38 self.daemon = True
40 def stop(self):
41 """ Stop the thread's activity. Returns when the thread terminates.
42 """
43 raise NotImplementedError
44
45 def get_address(self):
46 """ Get the channel's address.
47 """
48 return self._address
49
50 def set_adresss(self, address):
51 """ Set the channel's address. Should be a tuple of form:
52 (ip address [str], port [int])
53 or 'None' to indicate that no address has been specified.
54 """
55 # FIXME: Validate address.
56 if self.is_alive():
57 raise RuntimeError("Cannot set address on a running channel!")
58 else:
59 self._address = address
60
61 address = property(get_address, set_adresss)
39 62
40 63
41 64 class SubSocketChannel(ZmqSocketChannel):
@@ -43,20 +66,24 b' class SubSocketChannel(ZmqSocketChannel):'
43 66 handlers = None
44 67 _overriden_call_handler = None
45 68
46 def __init__(self, context, session, addr=None):
69 def __init__(self, context, session, address=None):
47 70 self.handlers = {}
48 super(SubSocketChannel, self).__init__(context, session, addr)
71 super(SubSocketChannel, self).__init__(context, session, address)
49 72
50 73 def run(self):
51 74 self.socket = self.context.socket(zmq.SUB)
52 75 self.socket.setsockopt(zmq.SUBSCRIBE,'')
53 76 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
54 self.socket.connect('tcp://%s:%i' % self.addr)
77 self.socket.connect('tcp://%s:%i' % self.address)
55 78 self.ioloop = ioloop.IOLoop()
56 79 self.ioloop.add_handler(self.socket, self._handle_events,
57 80 POLLIN|POLLERR)
58 81 self.ioloop.start()
59 82
83 def stop(self):
84 self.ioloop.stop()
85 self.join()
86
60 87 def _handle_events(self, socket, events):
61 88 # Turn on and off POLLOUT depending on if we have made a request
62 89 if events & POLLERR:
@@ -136,21 +163,25 b' class XReqSocketChannel(ZmqSocketChannel):'
136 163 handlers = None
137 164 _overriden_call_handler = None
138 165
139 def __init__(self, context, session, addr=None):
166 def __init__(self, context, session, address=None):
140 167 self.handlers = {}
141 168 self.handler_queue = Queue()
142 169 self.command_queue = Queue()
143 super(XReqSocketChannel, self).__init__(context, session, addr)
170 super(XReqSocketChannel, self).__init__(context, session, address)
144 171
145 172 def run(self):
146 173 self.socket = self.context.socket(zmq.XREQ)
147 174 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
148 self.socket.connect('tcp://%s:%i' % self.addr)
175 self.socket.connect('tcp://%s:%i' % self.address)
149 176 self.ioloop = ioloop.IOLoop()
150 177 self.ioloop.add_handler(self.socket, self._handle_events,
151 178 POLLIN|POLLOUT|POLLERR)
152 179 self.ioloop.start()
153 180
181 def stop(self):
182 self.ioloop.stop()
183 self.join()
184
154 185 def _handle_events(self, socket, events):
155 186 # Turn on and off POLLOUT depending on if we have made a request
156 187 if events & POLLERR:
@@ -246,7 +277,10 b' class XReqSocketChannel(ZmqSocketChannel):'
246 277
247 278 class RepSocketChannel(ZmqSocketChannel):
248 279
249 def on_raw_input():
280 def stop(self):
281 pass
282
283 def on_raw_input(self):
250 284 pass
251 285
252 286
@@ -268,22 +302,10 b' class KernelManager(HasTraits):'
268 302 # The Session to use for communication with the kernel.
269 303 session = Instance(Session, ())
270 304
271 # The channels objects used for communication with the kernel.
272 # FIXME: Add '_traitname_default' instantiation method to Traitlets.
273 #sub_channel = Instance(SubSocketChannel)
274 #xreq_channel = Instance(XReqSocketChannel)
275 #rep_channel = Instance(RepSocketChannel)
276
277 305 # The classes to use for the various channels.
278 306 sub_channel_class = Type(SubSocketChannel)
279 307 xreq_channel_class = Type(XReqSocketChannel)
280 308 rep_channel_class = Type(RepSocketChannel)
281
282 # The addresses to use for the various channels. Should be tuples of form
283 # (ip_address, port).
284 #sub_address = DelegatesTo('sub_channel')
285 #xreq_address = DelegatesTo('xreq_channel')
286 #rep_address = DelegatesTo('rep_channel')
287 309
288 310 # Protected traits.
289 311 _sub_channel = Any
@@ -302,9 +324,16 b' class KernelManager(HasTraits):'
302 324 """Start a localhost kernel. If ports have been specified, use
303 325 them. Otherwise, choose an open port at random.
304 326 """
327 self.sub_channel.start()
328 self.xreq_channel.start()
329 self.rep_channel.start()
305 330
306 331 def kill_kernel(self):
307 """Kill the running kernel"""
332 """Kill the running kernel.
333 """
334 self.sub_channel.stop()
335 self.xreq_channel.stop()
336 self.rep_channel.stop()
308 337
309 338 def is_alive(self):
310 339 """Is the kernel alive?"""
@@ -313,6 +342,10 b' class KernelManager(HasTraits):'
313 342 def signal_kernel(self, signum):
314 343 """Send signum to the kernel."""
315 344
345 #--------------------------------------------------------------------------
346 # Channels used for communication with the kernel:
347 #--------------------------------------------------------------------------
348
316 349 @property
317 350 def sub_channel(self):
318 351 """Get the SUB socket channel object."""
@@ -337,23 +370,34 b' class KernelManager(HasTraits):'
337 370 self.session)
338 371 return self._rep_channel
339 372
373 #--------------------------------------------------------------------------
374 # Channel addresses:
375 #--------------------------------------------------------------------------
376
340 377 def get_sub_address(self):
341 return self.sub_channel.addr
342 def set_sub_address(self, addr):
343 self.sub_channel.addr = addr
378 return self.sub_channel.address
379
380 def set_sub_address(self, address):
381 self.sub_channel.address = address
382
344 383 sub_address = property(get_sub_address, set_sub_address,
345 384 doc="The address used by SUB socket channel.")
346 385
347 386 def get_xreq_address(self):
348 return self.xreq_channel.addr
349 def set_xreq_address(self, addr):
350 self.xreq_channel.addr = addr
387 return self.xreq_channel.address
388
389 def set_xreq_address(self, address):
390 self.xreq_channel.address = address
391
351 392 xreq_address = property(get_xreq_address, set_xreq_address,
352 393 doc="The address used by XREQ socket channel.")
353 394
354 395 def get_rep_address(self):
355 return self.rep_channel.addr
356 def set_rep_address(self, addr):
357 self.rep_channel.addr = addr
396 return self.rep_channel.address
397
398 def set_rep_address(self, address):
399 self.rep_channel.address = address
400
358 401 rep_address = property(get_rep_address, set_rep_address,
359 402 doc="The address used by REP socket channel.")
403
General Comments 0
You need to be logged in to leave comments. Login now