##// END OF EJS Templates
* Added 'stop' methods to the ZmqSocketChannels...
epatters -
Show More
@@ -28,14 +28,37 b' class ZmqSocketChannel(Thread):'
28 """ The base class for the channels that use ZMQ sockets.
28 """ The base class for the channels that use ZMQ sockets.
29 """
29 """
30
30
31 def __init__(self, context, session, addr=None):
31 def __init__(self, context, session, address=None):
32 super(ZmqSocketChannel, self).__init__()
33 self.daemon = True
34
32 self.context = context
35 self.context = context
33 self.session = session
36 self.session = session
34 self.addr = addr
37 self.address = address
35 self.socket = None
38 self.socket = None
36
39
37 super(ZmqSocketChannel, self).__init__()
40 def stop(self):
38 self.daemon = True
41 """ Stop the thread's activity. Returns when the thread terminates.
42 """
43 raise NotImplementedError
44
45 def get_address(self):
46 """ Get the channel's address.
47 """
48 return self._address
49
50 def set_adresss(self, address):
51 """ Set the channel's address. Should be a tuple of form:
52 (ip address [str], port [int])
53 or 'None' to indicate that no address has been specified.
54 """
55 # FIXME: Validate address.
56 if self.is_alive():
57 raise RuntimeError("Cannot set address on a running channel!")
58 else:
59 self._address = address
60
61 address = property(get_address, set_adresss)
39
62
40
63
41 class SubSocketChannel(ZmqSocketChannel):
64 class SubSocketChannel(ZmqSocketChannel):
@@ -43,20 +66,24 b' class SubSocketChannel(ZmqSocketChannel):'
43 handlers = None
66 handlers = None
44 _overriden_call_handler = None
67 _overriden_call_handler = None
45
68
46 def __init__(self, context, session, addr=None):
69 def __init__(self, context, session, address=None):
47 self.handlers = {}
70 self.handlers = {}
48 super(SubSocketChannel, self).__init__(context, session, addr)
71 super(SubSocketChannel, self).__init__(context, session, address)
49
72
50 def run(self):
73 def run(self):
51 self.socket = self.context.socket(zmq.SUB)
74 self.socket = self.context.socket(zmq.SUB)
52 self.socket.setsockopt(zmq.SUBSCRIBE,'')
75 self.socket.setsockopt(zmq.SUBSCRIBE,'')
53 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
76 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
54 self.socket.connect('tcp://%s:%i' % self.addr)
77 self.socket.connect('tcp://%s:%i' % self.address)
55 self.ioloop = ioloop.IOLoop()
78 self.ioloop = ioloop.IOLoop()
56 self.ioloop.add_handler(self.socket, self._handle_events,
79 self.ioloop.add_handler(self.socket, self._handle_events,
57 POLLIN|POLLERR)
80 POLLIN|POLLERR)
58 self.ioloop.start()
81 self.ioloop.start()
59
82
83 def stop(self):
84 self.ioloop.stop()
85 self.join()
86
60 def _handle_events(self, socket, events):
87 def _handle_events(self, socket, events):
61 # Turn on and off POLLOUT depending on if we have made a request
88 # Turn on and off POLLOUT depending on if we have made a request
62 if events & POLLERR:
89 if events & POLLERR:
@@ -136,21 +163,25 b' class XReqSocketChannel(ZmqSocketChannel):'
136 handlers = None
163 handlers = None
137 _overriden_call_handler = None
164 _overriden_call_handler = None
138
165
139 def __init__(self, context, session, addr=None):
166 def __init__(self, context, session, address=None):
140 self.handlers = {}
167 self.handlers = {}
141 self.handler_queue = Queue()
168 self.handler_queue = Queue()
142 self.command_queue = Queue()
169 self.command_queue = Queue()
143 super(XReqSocketChannel, self).__init__(context, session, addr)
170 super(XReqSocketChannel, self).__init__(context, session, address)
144
171
145 def run(self):
172 def run(self):
146 self.socket = self.context.socket(zmq.XREQ)
173 self.socket = self.context.socket(zmq.XREQ)
147 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
174 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
148 self.socket.connect('tcp://%s:%i' % self.addr)
175 self.socket.connect('tcp://%s:%i' % self.address)
149 self.ioloop = ioloop.IOLoop()
176 self.ioloop = ioloop.IOLoop()
150 self.ioloop.add_handler(self.socket, self._handle_events,
177 self.ioloop.add_handler(self.socket, self._handle_events,
151 POLLIN|POLLOUT|POLLERR)
178 POLLIN|POLLOUT|POLLERR)
152 self.ioloop.start()
179 self.ioloop.start()
153
180
181 def stop(self):
182 self.ioloop.stop()
183 self.join()
184
154 def _handle_events(self, socket, events):
185 def _handle_events(self, socket, events):
155 # Turn on and off POLLOUT depending on if we have made a request
186 # Turn on and off POLLOUT depending on if we have made a request
156 if events & POLLERR:
187 if events & POLLERR:
@@ -246,7 +277,10 b' class XReqSocketChannel(ZmqSocketChannel):'
246
277
247 class RepSocketChannel(ZmqSocketChannel):
278 class RepSocketChannel(ZmqSocketChannel):
248
279
249 def on_raw_input():
280 def stop(self):
281 pass
282
283 def on_raw_input(self):
250 pass
284 pass
251
285
252
286
@@ -268,22 +302,10 b' class KernelManager(HasTraits):'
268 # The Session to use for communication with the kernel.
302 # The Session to use for communication with the kernel.
269 session = Instance(Session, ())
303 session = Instance(Session, ())
270
304
271 # The channels objects used for communication with the kernel.
272 # FIXME: Add '_traitname_default' instantiation method to Traitlets.
273 #sub_channel = Instance(SubSocketChannel)
274 #xreq_channel = Instance(XReqSocketChannel)
275 #rep_channel = Instance(RepSocketChannel)
276
277 # The classes to use for the various channels.
305 # The classes to use for the various channels.
278 sub_channel_class = Type(SubSocketChannel)
306 sub_channel_class = Type(SubSocketChannel)
279 xreq_channel_class = Type(XReqSocketChannel)
307 xreq_channel_class = Type(XReqSocketChannel)
280 rep_channel_class = Type(RepSocketChannel)
308 rep_channel_class = Type(RepSocketChannel)
281
282 # The addresses to use for the various channels. Should be tuples of form
283 # (ip_address, port).
284 #sub_address = DelegatesTo('sub_channel')
285 #xreq_address = DelegatesTo('xreq_channel')
286 #rep_address = DelegatesTo('rep_channel')
287
309
288 # Protected traits.
310 # Protected traits.
289 _sub_channel = Any
311 _sub_channel = Any
@@ -302,9 +324,16 b' class KernelManager(HasTraits):'
302 """Start a localhost kernel. If ports have been specified, use
324 """Start a localhost kernel. If ports have been specified, use
303 them. Otherwise, choose an open port at random.
325 them. Otherwise, choose an open port at random.
304 """
326 """
327 self.sub_channel.start()
328 self.xreq_channel.start()
329 self.rep_channel.start()
305
330
306 def kill_kernel(self):
331 def kill_kernel(self):
307 """Kill the running kernel"""
332 """Kill the running kernel.
333 """
334 self.sub_channel.stop()
335 self.xreq_channel.stop()
336 self.rep_channel.stop()
308
337
309 def is_alive(self):
338 def is_alive(self):
310 """Is the kernel alive?"""
339 """Is the kernel alive?"""
@@ -313,6 +342,10 b' class KernelManager(HasTraits):'
313 def signal_kernel(self, signum):
342 def signal_kernel(self, signum):
314 """Send signum to the kernel."""
343 """Send signum to the kernel."""
315
344
345 #--------------------------------------------------------------------------
346 # Channels used for communication with the kernel:
347 #--------------------------------------------------------------------------
348
316 @property
349 @property
317 def sub_channel(self):
350 def sub_channel(self):
318 """Get the SUB socket channel object."""
351 """Get the SUB socket channel object."""
@@ -337,23 +370,34 b' class KernelManager(HasTraits):'
337 self.session)
370 self.session)
338 return self._rep_channel
371 return self._rep_channel
339
372
373 #--------------------------------------------------------------------------
374 # Channel addresses:
375 #--------------------------------------------------------------------------
376
340 def get_sub_address(self):
377 def get_sub_address(self):
341 return self.sub_channel.addr
378 return self.sub_channel.address
342 def set_sub_address(self, addr):
379
343 self.sub_channel.addr = addr
380 def set_sub_address(self, address):
381 self.sub_channel.address = address
382
344 sub_address = property(get_sub_address, set_sub_address,
383 sub_address = property(get_sub_address, set_sub_address,
345 doc="The address used by SUB socket channel.")
384 doc="The address used by SUB socket channel.")
346
385
347 def get_xreq_address(self):
386 def get_xreq_address(self):
348 return self.xreq_channel.addr
387 return self.xreq_channel.address
349 def set_xreq_address(self, addr):
388
350 self.xreq_channel.addr = addr
389 def set_xreq_address(self, address):
390 self.xreq_channel.address = address
391
351 xreq_address = property(get_xreq_address, set_xreq_address,
392 xreq_address = property(get_xreq_address, set_xreq_address,
352 doc="The address used by XREQ socket channel.")
393 doc="The address used by XREQ socket channel.")
353
394
354 def get_rep_address(self):
395 def get_rep_address(self):
355 return self.rep_channel.addr
396 return self.rep_channel.address
356 def set_rep_address(self, addr):
397
357 self.rep_channel.addr = addr
398 def set_rep_address(self, address):
399 self.rep_channel.address = address
400
358 rep_address = property(get_rep_address, set_rep_address,
401 rep_address = property(get_rep_address, set_rep_address,
359 doc="The address used by REP socket channel.")
402 doc="The address used by REP socket channel.")
403
General Comments 0
You need to be logged in to leave comments. Login now