##// END OF EJS Templates
use pyzmq tools where appropriate...
MinRK -
Show More
@@ -32,8 +32,7 b' import zmq'
32 # Install the pyzmq ioloop. This has to be done before anything else from
32 # Install the pyzmq ioloop. This has to be done before anything else from
33 # tornado is imported.
33 # tornado is imported.
34 from zmq.eventloop import ioloop
34 from zmq.eventloop import ioloop
35 import tornado.ioloop
35 ioloop.install()
36 tornado.ioloop.IOLoop = ioloop.IOLoop
37
36
38 from tornado import httpserver
37 from tornado import httpserver
39 from tornado import web
38 from tornado import web
@@ -18,7 +18,6 b' TODO'
18 # Standard library imports.
18 # Standard library imports.
19 import errno
19 import errno
20 import json
20 import json
21 from Queue import Queue, Empty
22 from subprocess import Popen
21 from subprocess import Popen
23 import os
22 import os
24 import signal
23 import signal
@@ -28,8 +27,7 b' import time'
28
27
29 # System library imports.
28 # System library imports.
30 import zmq
29 import zmq
31 from zmq import POLLIN, POLLOUT, POLLERR
30 from zmq.eventloop import ioloop, zmqstream
32 from zmq.eventloop import ioloop
33
31
34 # Local imports.
32 # Local imports.
35 from IPython.config.loader import Config
33 from IPython.config.loader import Config
@@ -88,7 +86,7 b' class ZMQSocketChannel(Thread):'
88 session = None
86 session = None
89 socket = None
87 socket = None
90 ioloop = None
88 ioloop = None
91 iostate = None
89 stream = None
92 _address = None
90 _address = None
93
91
94 def __init__(self, context, session, address):
92 def __init__(self, context, session, address):
@@ -144,37 +142,28 b' class ZMQSocketChannel(Thread):'
144 """
142 """
145 return self._address
143 return self._address
146
144
147 def add_io_state(self, state):
145 def _queue_send(self, msg):
148 """Add IO state to the eventloop.
146 """Queue a message to be sent from the IOLoop's thread.
149
147
150 Parameters
148 Parameters
151 ----------
149 ----------
152 state : zmq.POLLIN|zmq.POLLOUT|zmq.POLLERR
150 msg : message to send
153 The IO state flag to set.
151
154
152 This is threadsafe, as it uses IOLoop.add_callback to give the loop's
155 This is thread safe as it uses the thread safe IOLoop.add_callback.
153 thread control of the action.
156 """
154 """
157 def add_io_state_callback():
155 def thread_send():
158 if not self.iostate & state:
156 self.session.send(self.stream, msg)
159 self.iostate = self.iostate | state
157 self.ioloop.add_callback(thread_send)
160 self.ioloop.update_handler(self.socket, self.iostate)
161 self.ioloop.add_callback(add_io_state_callback)
162
163 def drop_io_state(self, state):
164 """Drop IO state from the eventloop.
165
166 Parameters
167 ----------
168 state : zmq.POLLIN|zmq.POLLOUT|zmq.POLLERR
169 The IO state flag to set.
170
158
171 This is thread safe as it uses the thread safe IOLoop.add_callback.
159 def _handle_recv(self, msg):
160 """callback for stream.on_recv
161
162 unpacks message, and calls handlers with it.
172 """
163 """
173 def drop_io_state_callback():
164 ident,smsg = self.session.feed_identities(msg)
174 if self.iostate & state:
165 self.call_handlers(self.session.unserialize(smsg))
175 self.iostate = self.iostate & (~state)
166
176 self.ioloop.update_handler(self.socket, self.iostate)
177 self.ioloop.add_callback(drop_io_state_callback)
178
167
179
168
180 class ShellSocketChannel(ZMQSocketChannel):
169 class ShellSocketChannel(ZMQSocketChannel):
@@ -187,7 +176,6 b' class ShellSocketChannel(ZMQSocketChannel):'
187
176
188 def __init__(self, context, session, address):
177 def __init__(self, context, session, address):
189 super(ShellSocketChannel, self).__init__(context, session, address)
178 super(ShellSocketChannel, self).__init__(context, session, address)
190 self.command_queue = Queue()
191 self.ioloop = ioloop.IOLoop()
179 self.ioloop = ioloop.IOLoop()
192
180
193 def run(self):
181 def run(self):
@@ -195,9 +183,8 b' class ShellSocketChannel(ZMQSocketChannel):'
195 self.socket = self.context.socket(zmq.DEALER)
183 self.socket = self.context.socket(zmq.DEALER)
196 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
184 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
197 self.socket.connect('tcp://%s:%i' % self.address)
185 self.socket.connect('tcp://%s:%i' % self.address)
198 self.iostate = POLLERR|POLLIN
186 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
199 self.ioloop.add_handler(self.socket, self._handle_events,
187 self.stream.on_recv(self._handle_recv)
200 self.iostate)
201 self._run_loop()
188 self._run_loop()
202
189
203 def stop(self):
190 def stop(self):
@@ -268,7 +255,7 b' class ShellSocketChannel(ZMQSocketChannel):'
268 allow_stdin=allow_stdin,
255 allow_stdin=allow_stdin,
269 )
256 )
270 msg = self.session.msg('execute_request', content)
257 msg = self.session.msg('execute_request', content)
271 self._queue_request(msg)
258 self._queue_send(msg)
272 return msg['header']['msg_id']
259 return msg['header']['msg_id']
273
260
274 def complete(self, text, line, cursor_pos, block=None):
261 def complete(self, text, line, cursor_pos, block=None):
@@ -293,7 +280,7 b' class ShellSocketChannel(ZMQSocketChannel):'
293 """
280 """
294 content = dict(text=text, line=line, block=block, cursor_pos=cursor_pos)
281 content = dict(text=text, line=line, block=block, cursor_pos=cursor_pos)
295 msg = self.session.msg('complete_request', content)
282 msg = self.session.msg('complete_request', content)
296 self._queue_request(msg)
283 self._queue_send(msg)
297 return msg['header']['msg_id']
284 return msg['header']['msg_id']
298
285
299 def object_info(self, oname):
286 def object_info(self, oname):
@@ -310,7 +297,7 b' class ShellSocketChannel(ZMQSocketChannel):'
310 """
297 """
311 content = dict(oname=oname)
298 content = dict(oname=oname)
312 msg = self.session.msg('object_info_request', content)
299 msg = self.session.msg('object_info_request', content)
313 self._queue_request(msg)
300 self._queue_send(msg)
314 return msg['header']['msg_id']
301 return msg['header']['msg_id']
315
302
316 def history(self, raw=True, output=False, hist_access_type='range', **kwargs):
303 def history(self, raw=True, output=False, hist_access_type='range', **kwargs):
@@ -348,7 +335,7 b' class ShellSocketChannel(ZMQSocketChannel):'
348 content = dict(raw=raw, output=output, hist_access_type=hist_access_type,
335 content = dict(raw=raw, output=output, hist_access_type=hist_access_type,
349 **kwargs)
336 **kwargs)
350 msg = self.session.msg('history_request', content)
337 msg = self.session.msg('history_request', content)
351 self._queue_request(msg)
338 self._queue_send(msg)
352 return msg['header']['msg_id']
339 return msg['header']['msg_id']
353
340
354 def shutdown(self, restart=False):
341 def shutdown(self, restart=False):
@@ -365,38 +352,9 b' class ShellSocketChannel(ZMQSocketChannel):'
365 # Send quit message to kernel. Once we implement kernel-side setattr,
352 # Send quit message to kernel. Once we implement kernel-side setattr,
366 # this should probably be done that way, but for now this will do.
353 # this should probably be done that way, but for now this will do.
367 msg = self.session.msg('shutdown_request', {'restart':restart})
354 msg = self.session.msg('shutdown_request', {'restart':restart})
368 self._queue_request(msg)
355 self._queue_send(msg)
369 return msg['header']['msg_id']
356 return msg['header']['msg_id']
370
357
371 def _handle_events(self, socket, events):
372 if events & POLLERR:
373 self._handle_err()
374 if events & POLLOUT:
375 self._handle_send()
376 if events & POLLIN:
377 self._handle_recv()
378
379 def _handle_recv(self):
380 ident,msg = self.session.recv(self.socket, 0)
381 self.call_handlers(msg)
382
383 def _handle_send(self):
384 try:
385 msg = self.command_queue.get(False)
386 except Empty:
387 pass
388 else:
389 self.session.send(self.socket,msg)
390 if self.command_queue.empty():
391 self.drop_io_state(POLLOUT)
392
393 def _handle_err(self):
394 # We don't want to let this go silently, so eventually we should log.
395 raise zmq.ZMQError()
396
397 def _queue_request(self, msg):
398 self.command_queue.put(msg)
399 self.add_io_state(POLLOUT)
400
358
401
359
402 class SubSocketChannel(ZMQSocketChannel):
360 class SubSocketChannel(ZMQSocketChannel):
@@ -413,9 +371,8 b' class SubSocketChannel(ZMQSocketChannel):'
413 self.socket.setsockopt(zmq.SUBSCRIBE,b'')
371 self.socket.setsockopt(zmq.SUBSCRIBE,b'')
414 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
372 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
415 self.socket.connect('tcp://%s:%i' % self.address)
373 self.socket.connect('tcp://%s:%i' % self.address)
416 self.iostate = POLLIN|POLLERR
374 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
417 self.ioloop.add_handler(self.socket, self._handle_events,
375 self.stream.on_recv(self._handle_recv)
418 self.iostate)
419 self._run_loop()
376 self._run_loop()
420
377
421 def stop(self):
378 def stop(self):
@@ -456,33 +413,9 b' class SubSocketChannel(ZMQSocketChannel):'
456 while not self._flushed and time.time() < stop_time:
413 while not self._flushed and time.time() < stop_time:
457 time.sleep(0.01)
414 time.sleep(0.01)
458
415
459 def _handle_events(self, socket, events):
460 # Turn on and off POLLOUT depending on if we have made a request
461 if events & POLLERR:
462 self._handle_err()
463 if events & POLLIN:
464 self._handle_recv()
465
466 def _handle_err(self):
467 # We don't want to let this go silently, so eventually we should log.
468 raise zmq.ZMQError()
469
470 def _handle_recv(self):
471 # Get all of the messages we can
472 while True:
473 try:
474 ident,msg = self.session.recv(self.socket)
475 except zmq.ZMQError:
476 # Check the errno?
477 # Will this trigger POLLERR?
478 break
479 else:
480 if msg is None:
481 break
482 self.call_handlers(msg)
483
484 def _flush(self):
416 def _flush(self):
485 """Callback for :method:`self.flush`."""
417 """Callback for :method:`self.flush`."""
418 self.stream.flush()
486 self._flushed = True
419 self._flushed = True
487
420
488
421
@@ -494,16 +427,14 b' class StdInSocketChannel(ZMQSocketChannel):'
494 def __init__(self, context, session, address):
427 def __init__(self, context, session, address):
495 super(StdInSocketChannel, self).__init__(context, session, address)
428 super(StdInSocketChannel, self).__init__(context, session, address)
496 self.ioloop = ioloop.IOLoop()
429 self.ioloop = ioloop.IOLoop()
497 self.msg_queue = Queue()
498
430
499 def run(self):
431 def run(self):
500 """The thread's main activity. Call start() instead."""
432 """The thread's main activity. Call start() instead."""
501 self.socket = self.context.socket(zmq.DEALER)
433 self.socket = self.context.socket(zmq.DEALER)
502 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
434 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
503 self.socket.connect('tcp://%s:%i' % self.address)
435 self.socket.connect('tcp://%s:%i' % self.address)
504 self.iostate = POLLERR|POLLIN
436 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
505 self.ioloop.add_handler(self.socket, self._handle_events,
437 self.stream.on_recv(self._handle_recv)
506 self.iostate)
507 self._run_loop()
438 self._run_loop()
508
439
509 def stop(self):
440 def stop(self):
@@ -524,37 +455,7 b' class StdInSocketChannel(ZMQSocketChannel):'
524 """Send a string of raw input to the kernel."""
455 """Send a string of raw input to the kernel."""
525 content = dict(value=string)
456 content = dict(value=string)
526 msg = self.session.msg('input_reply', content)
457 msg = self.session.msg('input_reply', content)
527 self._queue_reply(msg)
458 self._queue_send(msg)
528
529 def _handle_events(self, socket, events):
530 if events & POLLERR:
531 self._handle_err()
532 if events & POLLOUT:
533 self._handle_send()
534 if events & POLLIN:
535 self._handle_recv()
536
537 def _handle_recv(self):
538 ident,msg = self.session.recv(self.socket, 0)
539 self.call_handlers(msg)
540
541 def _handle_send(self):
542 try:
543 msg = self.msg_queue.get(False)
544 except Empty:
545 pass
546 else:
547 self.session.send(self.socket,msg)
548 if self.msg_queue.empty():
549 self.drop_io_state(POLLOUT)
550
551 def _handle_err(self):
552 # We don't want to let this go silently, so eventually we should log.
553 raise zmq.ZMQError()
554
555 def _queue_reply(self, msg):
556 self.msg_queue.put(msg)
557 self.add_io_state(POLLOUT)
558
459
559
460
560 class HBSocketChannel(ZMQSocketChannel):
461 class HBSocketChannel(ZMQSocketChannel):
General Comments 0
You need to be logged in to leave comments. Login now