##// END OF EJS Templates
use pyzmq tools where appropriate...
MinRK -
Show More
@@ -32,8 +32,7 b' import zmq'
32 32 # Install the pyzmq ioloop. This has to be done before anything else from
33 33 # tornado is imported.
34 34 from zmq.eventloop import ioloop
35 import tornado.ioloop
36 tornado.ioloop.IOLoop = ioloop.IOLoop
35 ioloop.install()
37 36
38 37 from tornado import httpserver
39 38 from tornado import web
@@ -18,7 +18,6 b' TODO'
18 18 # Standard library imports.
19 19 import errno
20 20 import json
21 from Queue import Queue, Empty
22 21 from subprocess import Popen
23 22 import os
24 23 import signal
@@ -28,8 +27,7 b' import time'
28 27
29 28 # System library imports.
30 29 import zmq
31 from zmq import POLLIN, POLLOUT, POLLERR
32 from zmq.eventloop import ioloop
30 from zmq.eventloop import ioloop, zmqstream
33 31
34 32 # Local imports.
35 33 from IPython.config.loader import Config
@@ -88,7 +86,7 b' class ZMQSocketChannel(Thread):'
88 86 session = None
89 87 socket = None
90 88 ioloop = None
91 iostate = None
89 stream = None
92 90 _address = None
93 91
94 92 def __init__(self, context, session, address):
@@ -144,37 +142,28 b' class ZMQSocketChannel(Thread):'
144 142 """
145 143 return self._address
146 144
147 def add_io_state(self, state):
148 """Add IO state to the eventloop.
149
145 def _queue_send(self, msg):
146 """Queue a message to be sent from the IOLoop's thread.
147
150 148 Parameters
151 149 ----------
152 state : zmq.POLLIN|zmq.POLLOUT|zmq.POLLERR
153 The IO state flag to set.
154
155 This is thread safe as it uses the thread safe IOLoop.add_callback.
150 msg : message to send
151
152 This is threadsafe, as it uses IOLoop.add_callback to give the loop's
153 thread control of the action.
156 154 """
157 def add_io_state_callback():
158 if not self.iostate & state:
159 self.iostate = self.iostate | state
160 self.ioloop.update_handler(self.socket, self.iostate)
161 self.ioloop.add_callback(add_io_state_callback)
162
163 def drop_io_state(self, state):
164 """Drop IO state from the eventloop.
165
166 Parameters
167 ----------
168 state : zmq.POLLIN|zmq.POLLOUT|zmq.POLLERR
169 The IO state flag to set.
155 def thread_send():
156 self.session.send(self.stream, msg)
157 self.ioloop.add_callback(thread_send)
170 158
171 This is thread safe as it uses the thread safe IOLoop.add_callback.
159 def _handle_recv(self, msg):
160 """callback for stream.on_recv
161
162 unpacks message, and calls handlers with it.
172 163 """
173 def drop_io_state_callback():
174 if self.iostate & state:
175 self.iostate = self.iostate & (~state)
176 self.ioloop.update_handler(self.socket, self.iostate)
177 self.ioloop.add_callback(drop_io_state_callback)
164 ident,smsg = self.session.feed_identities(msg)
165 self.call_handlers(self.session.unserialize(smsg))
166
178 167
179 168
180 169 class ShellSocketChannel(ZMQSocketChannel):
@@ -187,7 +176,6 b' class ShellSocketChannel(ZMQSocketChannel):'
187 176
188 177 def __init__(self, context, session, address):
189 178 super(ShellSocketChannel, self).__init__(context, session, address)
190 self.command_queue = Queue()
191 179 self.ioloop = ioloop.IOLoop()
192 180
193 181 def run(self):
@@ -195,9 +183,8 b' class ShellSocketChannel(ZMQSocketChannel):'
195 183 self.socket = self.context.socket(zmq.DEALER)
196 184 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
197 185 self.socket.connect('tcp://%s:%i' % self.address)
198 self.iostate = POLLERR|POLLIN
199 self.ioloop.add_handler(self.socket, self._handle_events,
200 self.iostate)
186 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
187 self.stream.on_recv(self._handle_recv)
201 188 self._run_loop()
202 189
203 190 def stop(self):
@@ -268,7 +255,7 b' class ShellSocketChannel(ZMQSocketChannel):'
268 255 allow_stdin=allow_stdin,
269 256 )
270 257 msg = self.session.msg('execute_request', content)
271 self._queue_request(msg)
258 self._queue_send(msg)
272 259 return msg['header']['msg_id']
273 260
274 261 def complete(self, text, line, cursor_pos, block=None):
@@ -293,7 +280,7 b' class ShellSocketChannel(ZMQSocketChannel):'
293 280 """
294 281 content = dict(text=text, line=line, block=block, cursor_pos=cursor_pos)
295 282 msg = self.session.msg('complete_request', content)
296 self._queue_request(msg)
283 self._queue_send(msg)
297 284 return msg['header']['msg_id']
298 285
299 286 def object_info(self, oname):
@@ -310,7 +297,7 b' class ShellSocketChannel(ZMQSocketChannel):'
310 297 """
311 298 content = dict(oname=oname)
312 299 msg = self.session.msg('object_info_request', content)
313 self._queue_request(msg)
300 self._queue_send(msg)
314 301 return msg['header']['msg_id']
315 302
316 303 def history(self, raw=True, output=False, hist_access_type='range', **kwargs):
@@ -348,7 +335,7 b' class ShellSocketChannel(ZMQSocketChannel):'
348 335 content = dict(raw=raw, output=output, hist_access_type=hist_access_type,
349 336 **kwargs)
350 337 msg = self.session.msg('history_request', content)
351 self._queue_request(msg)
338 self._queue_send(msg)
352 339 return msg['header']['msg_id']
353 340
354 341 def shutdown(self, restart=False):
@@ -365,38 +352,9 b' class ShellSocketChannel(ZMQSocketChannel):'
365 352 # Send quit message to kernel. Once we implement kernel-side setattr,
366 353 # this should probably be done that way, but for now this will do.
367 354 msg = self.session.msg('shutdown_request', {'restart':restart})
368 self._queue_request(msg)
355 self._queue_send(msg)
369 356 return msg['header']['msg_id']
370 357
371 def _handle_events(self, socket, events):
372 if events & POLLERR:
373 self._handle_err()
374 if events & POLLOUT:
375 self._handle_send()
376 if events & POLLIN:
377 self._handle_recv()
378
379 def _handle_recv(self):
380 ident,msg = self.session.recv(self.socket, 0)
381 self.call_handlers(msg)
382
383 def _handle_send(self):
384 try:
385 msg = self.command_queue.get(False)
386 except Empty:
387 pass
388 else:
389 self.session.send(self.socket,msg)
390 if self.command_queue.empty():
391 self.drop_io_state(POLLOUT)
392
393 def _handle_err(self):
394 # We don't want to let this go silently, so eventually we should log.
395 raise zmq.ZMQError()
396
397 def _queue_request(self, msg):
398 self.command_queue.put(msg)
399 self.add_io_state(POLLOUT)
400 358
401 359
402 360 class SubSocketChannel(ZMQSocketChannel):
@@ -413,9 +371,8 b' class SubSocketChannel(ZMQSocketChannel):'
413 371 self.socket.setsockopt(zmq.SUBSCRIBE,b'')
414 372 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
415 373 self.socket.connect('tcp://%s:%i' % self.address)
416 self.iostate = POLLIN|POLLERR
417 self.ioloop.add_handler(self.socket, self._handle_events,
418 self.iostate)
374 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
375 self.stream.on_recv(self._handle_recv)
419 376 self._run_loop()
420 377
421 378 def stop(self):
@@ -456,33 +413,9 b' class SubSocketChannel(ZMQSocketChannel):'
456 413 while not self._flushed and time.time() < stop_time:
457 414 time.sleep(0.01)
458 415
459 def _handle_events(self, socket, events):
460 # Turn on and off POLLOUT depending on if we have made a request
461 if events & POLLERR:
462 self._handle_err()
463 if events & POLLIN:
464 self._handle_recv()
465
466 def _handle_err(self):
467 # We don't want to let this go silently, so eventually we should log.
468 raise zmq.ZMQError()
469
470 def _handle_recv(self):
471 # Get all of the messages we can
472 while True:
473 try:
474 ident,msg = self.session.recv(self.socket)
475 except zmq.ZMQError:
476 # Check the errno?
477 # Will this trigger POLLERR?
478 break
479 else:
480 if msg is None:
481 break
482 self.call_handlers(msg)
483
484 416 def _flush(self):
485 417 """Callback for :method:`self.flush`."""
418 self.stream.flush()
486 419 self._flushed = True
487 420
488 421
@@ -494,16 +427,14 b' class StdInSocketChannel(ZMQSocketChannel):'
494 427 def __init__(self, context, session, address):
495 428 super(StdInSocketChannel, self).__init__(context, session, address)
496 429 self.ioloop = ioloop.IOLoop()
497 self.msg_queue = Queue()
498 430
499 431 def run(self):
500 432 """The thread's main activity. Call start() instead."""
501 433 self.socket = self.context.socket(zmq.DEALER)
502 434 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
503 435 self.socket.connect('tcp://%s:%i' % self.address)
504 self.iostate = POLLERR|POLLIN
505 self.ioloop.add_handler(self.socket, self._handle_events,
506 self.iostate)
436 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
437 self.stream.on_recv(self._handle_recv)
507 438 self._run_loop()
508 439
509 440 def stop(self):
@@ -524,37 +455,7 b' class StdInSocketChannel(ZMQSocketChannel):'
524 455 """Send a string of raw input to the kernel."""
525 456 content = dict(value=string)
526 457 msg = self.session.msg('input_reply', content)
527 self._queue_reply(msg)
528
529 def _handle_events(self, socket, events):
530 if events & POLLERR:
531 self._handle_err()
532 if events & POLLOUT:
533 self._handle_send()
534 if events & POLLIN:
535 self._handle_recv()
536
537 def _handle_recv(self):
538 ident,msg = self.session.recv(self.socket, 0)
539 self.call_handlers(msg)
540
541 def _handle_send(self):
542 try:
543 msg = self.msg_queue.get(False)
544 except Empty:
545 pass
546 else:
547 self.session.send(self.socket,msg)
548 if self.msg_queue.empty():
549 self.drop_io_state(POLLOUT)
550
551 def _handle_err(self):
552 # We don't want to let this go silently, so eventually we should log.
553 raise zmq.ZMQError()
554
555 def _queue_reply(self, msg):
556 self.msg_queue.put(msg)
557 self.add_io_state(POLLOUT)
458 self._queue_send(msg)
558 459
559 460
560 461 class HBSocketChannel(ZMQSocketChannel):
General Comments 0
You need to be logged in to leave comments. Login now