Show More
@@ -32,8 +32,7 b' import zmq' | |||
|
32 | 32 | # Install the pyzmq ioloop. This has to be done before anything else from |
|
33 | 33 | # tornado is imported. |
|
34 | 34 | from zmq.eventloop import ioloop |
|
35 | import tornado.ioloop | |
|
36 | tornado.ioloop.IOLoop = ioloop.IOLoop | |
|
35 | ioloop.install() | |
|
37 | 36 | |
|
38 | 37 | from tornado import httpserver |
|
39 | 38 | from tornado import web |
@@ -18,7 +18,6 b' TODO' | |||
|
18 | 18 | # Standard library imports. |
|
19 | 19 | import errno |
|
20 | 20 | import json |
|
21 | from Queue import Queue, Empty | |
|
22 | 21 | from subprocess import Popen |
|
23 | 22 | import os |
|
24 | 23 | import signal |
@@ -28,8 +27,7 b' import time' | |||
|
28 | 27 | |
|
29 | 28 | # System library imports. |
|
30 | 29 | import zmq |
|
31 | from zmq import POLLIN, POLLOUT, POLLERR | |
|
32 | from zmq.eventloop import ioloop | |
|
30 | from zmq.eventloop import ioloop, zmqstream | |
|
33 | 31 | |
|
34 | 32 | # Local imports. |
|
35 | 33 | from IPython.config.loader import Config |
@@ -88,7 +86,7 b' class ZMQSocketChannel(Thread):' | |||
|
88 | 86 | session = None |
|
89 | 87 | socket = None |
|
90 | 88 | ioloop = None |
|
91 |
|
|
|
89 | stream = None | |
|
92 | 90 | _address = None |
|
93 | 91 | |
|
94 | 92 | def __init__(self, context, session, address): |
@@ -144,37 +142,28 b' class ZMQSocketChannel(Thread):' | |||
|
144 | 142 | """ |
|
145 | 143 | return self._address |
|
146 | 144 | |
|
147 |
def |
|
|
148 | """Add IO state to the eventloop. | |
|
145 | def _queue_send(self, msg): | |
|
146 | """Queue a message to be sent from the IOLoop's thread. | |
|
149 | 147 | |
|
150 | 148 | Parameters |
|
151 | 149 | ---------- |
|
152 | state : zmq.POLLIN|zmq.POLLOUT|zmq.POLLERR | |
|
153 | The IO state flag to set. | |
|
150 | msg : message to send | |
|
154 | 151 | |
|
155 |
This is thread |
|
|
152 | This is threadsafe, as it uses IOLoop.add_callback to give the loop's | |
|
153 | thread control of the action. | |
|
156 | 154 | """ |
|
157 |
def |
|
|
158 | if not self.iostate & state: | |
|
159 | self.iostate = self.iostate | state | |
|
160 | self.ioloop.update_handler(self.socket, self.iostate) | |
|
161 | self.ioloop.add_callback(add_io_state_callback) | |
|
155 | def thread_send(): | |
|
156 | self.session.send(self.stream, msg) | |
|
157 | self.ioloop.add_callback(thread_send) | |
|
162 | 158 | |
|
163 |
def |
|
|
164 | """Drop IO state from the eventloop. | |
|
159 | def _handle_recv(self, msg): | |
|
160 | """callback for stream.on_recv | |
|
165 | 161 | |
|
166 | Parameters | |
|
167 | ---------- | |
|
168 | state : zmq.POLLIN|zmq.POLLOUT|zmq.POLLERR | |
|
169 | The IO state flag to set. | |
|
170 | ||
|
171 | This is thread safe as it uses the thread safe IOLoop.add_callback. | |
|
162 | unpacks message, and calls handlers with it. | |
|
172 | 163 | """ |
|
173 | def drop_io_state_callback(): | |
|
174 | if self.iostate & state: | |
|
175 | self.iostate = self.iostate & (~state) | |
|
176 | self.ioloop.update_handler(self.socket, self.iostate) | |
|
177 | self.ioloop.add_callback(drop_io_state_callback) | |
|
164 | ident,smsg = self.session.feed_identities(msg) | |
|
165 | self.call_handlers(self.session.unserialize(smsg)) | |
|
166 | ||
|
178 | 167 | |
|
179 | 168 | |
|
180 | 169 | class ShellSocketChannel(ZMQSocketChannel): |
@@ -187,7 +176,6 b' class ShellSocketChannel(ZMQSocketChannel):' | |||
|
187 | 176 | |
|
188 | 177 | def __init__(self, context, session, address): |
|
189 | 178 | super(ShellSocketChannel, self).__init__(context, session, address) |
|
190 | self.command_queue = Queue() | |
|
191 | 179 | self.ioloop = ioloop.IOLoop() |
|
192 | 180 | |
|
193 | 181 | def run(self): |
@@ -195,9 +183,8 b' class ShellSocketChannel(ZMQSocketChannel):' | |||
|
195 | 183 | self.socket = self.context.socket(zmq.DEALER) |
|
196 | 184 | self.socket.setsockopt(zmq.IDENTITY, self.session.bsession) |
|
197 | 185 | self.socket.connect('tcp://%s:%i' % self.address) |
|
198 | self.iostate = POLLERR|POLLIN | |
|
199 | self.ioloop.add_handler(self.socket, self._handle_events, | |
|
200 | self.iostate) | |
|
186 | self.stream = zmqstream.ZMQStream(self.socket, self.ioloop) | |
|
187 | self.stream.on_recv(self._handle_recv) | |
|
201 | 188 | self._run_loop() |
|
202 | 189 | |
|
203 | 190 | def stop(self): |
@@ -268,7 +255,7 b' class ShellSocketChannel(ZMQSocketChannel):' | |||
|
268 | 255 | allow_stdin=allow_stdin, |
|
269 | 256 | ) |
|
270 | 257 | msg = self.session.msg('execute_request', content) |
|
271 |
self._queue_ |
|
|
258 | self._queue_send(msg) | |
|
272 | 259 | return msg['header']['msg_id'] |
|
273 | 260 | |
|
274 | 261 | def complete(self, text, line, cursor_pos, block=None): |
@@ -293,7 +280,7 b' class ShellSocketChannel(ZMQSocketChannel):' | |||
|
293 | 280 | """ |
|
294 | 281 | content = dict(text=text, line=line, block=block, cursor_pos=cursor_pos) |
|
295 | 282 | msg = self.session.msg('complete_request', content) |
|
296 |
self._queue_ |
|
|
283 | self._queue_send(msg) | |
|
297 | 284 | return msg['header']['msg_id'] |
|
298 | 285 | |
|
299 | 286 | def object_info(self, oname): |
@@ -310,7 +297,7 b' class ShellSocketChannel(ZMQSocketChannel):' | |||
|
310 | 297 | """ |
|
311 | 298 | content = dict(oname=oname) |
|
312 | 299 | msg = self.session.msg('object_info_request', content) |
|
313 |
self._queue_ |
|
|
300 | self._queue_send(msg) | |
|
314 | 301 | return msg['header']['msg_id'] |
|
315 | 302 | |
|
316 | 303 | def history(self, raw=True, output=False, hist_access_type='range', **kwargs): |
@@ -348,7 +335,7 b' class ShellSocketChannel(ZMQSocketChannel):' | |||
|
348 | 335 | content = dict(raw=raw, output=output, hist_access_type=hist_access_type, |
|
349 | 336 | **kwargs) |
|
350 | 337 | msg = self.session.msg('history_request', content) |
|
351 |
self._queue_ |
|
|
338 | self._queue_send(msg) | |
|
352 | 339 | return msg['header']['msg_id'] |
|
353 | 340 | |
|
354 | 341 | def shutdown(self, restart=False): |
@@ -365,38 +352,9 b' class ShellSocketChannel(ZMQSocketChannel):' | |||
|
365 | 352 | # Send quit message to kernel. Once we implement kernel-side setattr, |
|
366 | 353 | # this should probably be done that way, but for now this will do. |
|
367 | 354 | msg = self.session.msg('shutdown_request', {'restart':restart}) |
|
368 |
self._queue_ |
|
|
355 | self._queue_send(msg) | |
|
369 | 356 | return msg['header']['msg_id'] |
|
370 | 357 | |
|
371 | def _handle_events(self, socket, events): | |
|
372 | if events & POLLERR: | |
|
373 | self._handle_err() | |
|
374 | if events & POLLOUT: | |
|
375 | self._handle_send() | |
|
376 | if events & POLLIN: | |
|
377 | self._handle_recv() | |
|
378 | ||
|
379 | def _handle_recv(self): | |
|
380 | ident,msg = self.session.recv(self.socket, 0) | |
|
381 | self.call_handlers(msg) | |
|
382 | ||
|
383 | def _handle_send(self): | |
|
384 | try: | |
|
385 | msg = self.command_queue.get(False) | |
|
386 | except Empty: | |
|
387 | pass | |
|
388 | else: | |
|
389 | self.session.send(self.socket,msg) | |
|
390 | if self.command_queue.empty(): | |
|
391 | self.drop_io_state(POLLOUT) | |
|
392 | ||
|
393 | def _handle_err(self): | |
|
394 | # We don't want to let this go silently, so eventually we should log. | |
|
395 | raise zmq.ZMQError() | |
|
396 | ||
|
397 | def _queue_request(self, msg): | |
|
398 | self.command_queue.put(msg) | |
|
399 | self.add_io_state(POLLOUT) | |
|
400 | 358 | |
|
401 | 359 | |
|
402 | 360 | class SubSocketChannel(ZMQSocketChannel): |
@@ -413,9 +371,8 b' class SubSocketChannel(ZMQSocketChannel):' | |||
|
413 | 371 | self.socket.setsockopt(zmq.SUBSCRIBE,b'') |
|
414 | 372 | self.socket.setsockopt(zmq.IDENTITY, self.session.bsession) |
|
415 | 373 | self.socket.connect('tcp://%s:%i' % self.address) |
|
416 | self.iostate = POLLIN|POLLERR | |
|
417 | self.ioloop.add_handler(self.socket, self._handle_events, | |
|
418 | self.iostate) | |
|
374 | self.stream = zmqstream.ZMQStream(self.socket, self.ioloop) | |
|
375 | self.stream.on_recv(self._handle_recv) | |
|
419 | 376 | self._run_loop() |
|
420 | 377 | |
|
421 | 378 | def stop(self): |
@@ -456,33 +413,9 b' class SubSocketChannel(ZMQSocketChannel):' | |||
|
456 | 413 | while not self._flushed and time.time() < stop_time: |
|
457 | 414 | time.sleep(0.01) |
|
458 | 415 | |
|
459 | def _handle_events(self, socket, events): | |
|
460 | # Turn on and off POLLOUT depending on if we have made a request | |
|
461 | if events & POLLERR: | |
|
462 | self._handle_err() | |
|
463 | if events & POLLIN: | |
|
464 | self._handle_recv() | |
|
465 | ||
|
466 | def _handle_err(self): | |
|
467 | # We don't want to let this go silently, so eventually we should log. | |
|
468 | raise zmq.ZMQError() | |
|
469 | ||
|
470 | def _handle_recv(self): | |
|
471 | # Get all of the messages we can | |
|
472 | while True: | |
|
473 | try: | |
|
474 | ident,msg = self.session.recv(self.socket) | |
|
475 | except zmq.ZMQError: | |
|
476 | # Check the errno? | |
|
477 | # Will this trigger POLLERR? | |
|
478 | break | |
|
479 | else: | |
|
480 | if msg is None: | |
|
481 | break | |
|
482 | self.call_handlers(msg) | |
|
483 | ||
|
484 | 416 | def _flush(self): |
|
485 | 417 | """Callback for :method:`self.flush`.""" |
|
418 | self.stream.flush() | |
|
486 | 419 | self._flushed = True |
|
487 | 420 | |
|
488 | 421 | |
@@ -494,16 +427,14 b' class StdInSocketChannel(ZMQSocketChannel):' | |||
|
494 | 427 | def __init__(self, context, session, address): |
|
495 | 428 | super(StdInSocketChannel, self).__init__(context, session, address) |
|
496 | 429 | self.ioloop = ioloop.IOLoop() |
|
497 | self.msg_queue = Queue() | |
|
498 | 430 | |
|
499 | 431 | def run(self): |
|
500 | 432 | """The thread's main activity. Call start() instead.""" |
|
501 | 433 | self.socket = self.context.socket(zmq.DEALER) |
|
502 | 434 | self.socket.setsockopt(zmq.IDENTITY, self.session.bsession) |
|
503 | 435 | self.socket.connect('tcp://%s:%i' % self.address) |
|
504 | self.iostate = POLLERR|POLLIN | |
|
505 | self.ioloop.add_handler(self.socket, self._handle_events, | |
|
506 | self.iostate) | |
|
436 | self.stream = zmqstream.ZMQStream(self.socket, self.ioloop) | |
|
437 | self.stream.on_recv(self._handle_recv) | |
|
507 | 438 | self._run_loop() |
|
508 | 439 | |
|
509 | 440 | def stop(self): |
@@ -524,37 +455,7 b' class StdInSocketChannel(ZMQSocketChannel):' | |||
|
524 | 455 | """Send a string of raw input to the kernel.""" |
|
525 | 456 | content = dict(value=string) |
|
526 | 457 | msg = self.session.msg('input_reply', content) |
|
527 |
self._queue_ |
|
|
528 | ||
|
529 | def _handle_events(self, socket, events): | |
|
530 | if events & POLLERR: | |
|
531 | self._handle_err() | |
|
532 | if events & POLLOUT: | |
|
533 | self._handle_send() | |
|
534 | if events & POLLIN: | |
|
535 | self._handle_recv() | |
|
536 | ||
|
537 | def _handle_recv(self): | |
|
538 | ident,msg = self.session.recv(self.socket, 0) | |
|
539 | self.call_handlers(msg) | |
|
540 | ||
|
541 | def _handle_send(self): | |
|
542 | try: | |
|
543 | msg = self.msg_queue.get(False) | |
|
544 | except Empty: | |
|
545 | pass | |
|
546 | else: | |
|
547 | self.session.send(self.socket,msg) | |
|
548 | if self.msg_queue.empty(): | |
|
549 | self.drop_io_state(POLLOUT) | |
|
550 | ||
|
551 | def _handle_err(self): | |
|
552 | # We don't want to let this go silently, so eventually we should log. | |
|
553 | raise zmq.ZMQError() | |
|
554 | ||
|
555 | def _queue_reply(self, msg): | |
|
556 | self.msg_queue.put(msg) | |
|
557 | self.add_io_state(POLLOUT) | |
|
458 | self._queue_send(msg) | |
|
558 | 459 | |
|
559 | 460 | |
|
560 | 461 | class HBSocketChannel(ZMQSocketChannel): |
General Comments 0
You need to be logged in to leave comments.
Login now