Show More
@@ -32,8 +32,7 b' import zmq' | |||||
32 | # Install the pyzmq ioloop. This has to be done before anything else from |
|
32 | # Install the pyzmq ioloop. This has to be done before anything else from | |
33 | # tornado is imported. |
|
33 | # tornado is imported. | |
34 | from zmq.eventloop import ioloop |
|
34 | from zmq.eventloop import ioloop | |
35 | import tornado.ioloop |
|
35 | ioloop.install() | |
36 | tornado.ioloop.IOLoop = ioloop.IOLoop |
|
|||
37 |
|
36 | |||
38 | from tornado import httpserver |
|
37 | from tornado import httpserver | |
39 | from tornado import web |
|
38 | from tornado import web |
@@ -18,7 +18,6 b' TODO' | |||||
18 | # Standard library imports. |
|
18 | # Standard library imports. | |
19 | import errno |
|
19 | import errno | |
20 | import json |
|
20 | import json | |
21 | from Queue import Queue, Empty |
|
|||
22 | from subprocess import Popen |
|
21 | from subprocess import Popen | |
23 | import os |
|
22 | import os | |
24 | import signal |
|
23 | import signal | |
@@ -28,8 +27,7 b' import time' | |||||
28 |
|
27 | |||
29 | # System library imports. |
|
28 | # System library imports. | |
30 | import zmq |
|
29 | import zmq | |
31 | from zmq import POLLIN, POLLOUT, POLLERR |
|
30 | from zmq.eventloop import ioloop, zmqstream | |
32 | from zmq.eventloop import ioloop |
|
|||
33 |
|
31 | |||
34 | # Local imports. |
|
32 | # Local imports. | |
35 | from IPython.config.loader import Config |
|
33 | from IPython.config.loader import Config | |
@@ -88,7 +86,7 b' class ZMQSocketChannel(Thread):' | |||||
88 | session = None |
|
86 | session = None | |
89 | socket = None |
|
87 | socket = None | |
90 | ioloop = None |
|
88 | ioloop = None | |
91 |
|
|
89 | stream = None | |
92 | _address = None |
|
90 | _address = None | |
93 |
|
91 | |||
94 | def __init__(self, context, session, address): |
|
92 | def __init__(self, context, session, address): | |
@@ -144,37 +142,28 b' class ZMQSocketChannel(Thread):' | |||||
144 | """ |
|
142 | """ | |
145 | return self._address |
|
143 | return self._address | |
146 |
|
144 | |||
147 |
def |
|
145 | def _queue_send(self, msg): | |
148 | """Add IO state to the eventloop. |
|
146 | """Queue a message to be sent from the IOLoop's thread. | |
149 |
|
147 | |||
150 | Parameters |
|
148 | Parameters | |
151 | ---------- |
|
149 | ---------- | |
152 | state : zmq.POLLIN|zmq.POLLOUT|zmq.POLLERR |
|
150 | msg : message to send | |
153 | The IO state flag to set. |
|
151 | ||
154 |
|
152 | This is threadsafe, as it uses IOLoop.add_callback to give the loop's | ||
155 | This is thread safe as it uses the thread safe IOLoop.add_callback. |
|
153 | thread control of the action. | |
156 | """ |
|
154 | """ | |
157 |
def |
|
155 | def thread_send(): | |
158 | if not self.iostate & state: |
|
156 | self.session.send(self.stream, msg) | |
159 | self.iostate = self.iostate | state |
|
157 | self.ioloop.add_callback(thread_send) | |
160 | self.ioloop.update_handler(self.socket, self.iostate) |
|
|||
161 | self.ioloop.add_callback(add_io_state_callback) |
|
|||
162 |
|
||||
163 | def drop_io_state(self, state): |
|
|||
164 | """Drop IO state from the eventloop. |
|
|||
165 |
|
||||
166 | Parameters |
|
|||
167 | ---------- |
|
|||
168 | state : zmq.POLLIN|zmq.POLLOUT|zmq.POLLERR |
|
|||
169 | The IO state flag to set. |
|
|||
170 |
|
158 | |||
171 | This is thread safe as it uses the thread safe IOLoop.add_callback. |
|
159 | def _handle_recv(self, msg): | |
|
160 | """callback for stream.on_recv | |||
|
161 | ||||
|
162 | unpacks message, and calls handlers with it. | |||
172 | """ |
|
163 | """ | |
173 | def drop_io_state_callback(): |
|
164 | ident,smsg = self.session.feed_identities(msg) | |
174 | if self.iostate & state: |
|
165 | self.call_handlers(self.session.unserialize(smsg)) | |
175 | self.iostate = self.iostate & (~state) |
|
166 | ||
176 | self.ioloop.update_handler(self.socket, self.iostate) |
|
|||
177 | self.ioloop.add_callback(drop_io_state_callback) |
|
|||
178 |
|
167 | |||
179 |
|
168 | |||
180 | class ShellSocketChannel(ZMQSocketChannel): |
|
169 | class ShellSocketChannel(ZMQSocketChannel): | |
@@ -187,7 +176,6 b' class ShellSocketChannel(ZMQSocketChannel):' | |||||
187 |
|
176 | |||
188 | def __init__(self, context, session, address): |
|
177 | def __init__(self, context, session, address): | |
189 | super(ShellSocketChannel, self).__init__(context, session, address) |
|
178 | super(ShellSocketChannel, self).__init__(context, session, address) | |
190 | self.command_queue = Queue() |
|
|||
191 | self.ioloop = ioloop.IOLoop() |
|
179 | self.ioloop = ioloop.IOLoop() | |
192 |
|
180 | |||
193 | def run(self): |
|
181 | def run(self): | |
@@ -195,9 +183,8 b' class ShellSocketChannel(ZMQSocketChannel):' | |||||
195 | self.socket = self.context.socket(zmq.DEALER) |
|
183 | self.socket = self.context.socket(zmq.DEALER) | |
196 | self.socket.setsockopt(zmq.IDENTITY, self.session.bsession) |
|
184 | self.socket.setsockopt(zmq.IDENTITY, self.session.bsession) | |
197 | self.socket.connect('tcp://%s:%i' % self.address) |
|
185 | self.socket.connect('tcp://%s:%i' % self.address) | |
198 | self.iostate = POLLERR|POLLIN |
|
186 | self.stream = zmqstream.ZMQStream(self.socket, self.ioloop) | |
199 | self.ioloop.add_handler(self.socket, self._handle_events, |
|
187 | self.stream.on_recv(self._handle_recv) | |
200 | self.iostate) |
|
|||
201 | self._run_loop() |
|
188 | self._run_loop() | |
202 |
|
189 | |||
203 | def stop(self): |
|
190 | def stop(self): | |
@@ -268,7 +255,7 b' class ShellSocketChannel(ZMQSocketChannel):' | |||||
268 | allow_stdin=allow_stdin, |
|
255 | allow_stdin=allow_stdin, | |
269 | ) |
|
256 | ) | |
270 | msg = self.session.msg('execute_request', content) |
|
257 | msg = self.session.msg('execute_request', content) | |
271 |
self._queue_ |
|
258 | self._queue_send(msg) | |
272 | return msg['header']['msg_id'] |
|
259 | return msg['header']['msg_id'] | |
273 |
|
260 | |||
274 | def complete(self, text, line, cursor_pos, block=None): |
|
261 | def complete(self, text, line, cursor_pos, block=None): | |
@@ -293,7 +280,7 b' class ShellSocketChannel(ZMQSocketChannel):' | |||||
293 | """ |
|
280 | """ | |
294 | content = dict(text=text, line=line, block=block, cursor_pos=cursor_pos) |
|
281 | content = dict(text=text, line=line, block=block, cursor_pos=cursor_pos) | |
295 | msg = self.session.msg('complete_request', content) |
|
282 | msg = self.session.msg('complete_request', content) | |
296 |
self._queue_ |
|
283 | self._queue_send(msg) | |
297 | return msg['header']['msg_id'] |
|
284 | return msg['header']['msg_id'] | |
298 |
|
285 | |||
299 | def object_info(self, oname): |
|
286 | def object_info(self, oname): | |
@@ -310,7 +297,7 b' class ShellSocketChannel(ZMQSocketChannel):' | |||||
310 | """ |
|
297 | """ | |
311 | content = dict(oname=oname) |
|
298 | content = dict(oname=oname) | |
312 | msg = self.session.msg('object_info_request', content) |
|
299 | msg = self.session.msg('object_info_request', content) | |
313 |
self._queue_ |
|
300 | self._queue_send(msg) | |
314 | return msg['header']['msg_id'] |
|
301 | return msg['header']['msg_id'] | |
315 |
|
302 | |||
316 | def history(self, raw=True, output=False, hist_access_type='range', **kwargs): |
|
303 | def history(self, raw=True, output=False, hist_access_type='range', **kwargs): | |
@@ -348,7 +335,7 b' class ShellSocketChannel(ZMQSocketChannel):' | |||||
348 | content = dict(raw=raw, output=output, hist_access_type=hist_access_type, |
|
335 | content = dict(raw=raw, output=output, hist_access_type=hist_access_type, | |
349 | **kwargs) |
|
336 | **kwargs) | |
350 | msg = self.session.msg('history_request', content) |
|
337 | msg = self.session.msg('history_request', content) | |
351 |
self._queue_ |
|
338 | self._queue_send(msg) | |
352 | return msg['header']['msg_id'] |
|
339 | return msg['header']['msg_id'] | |
353 |
|
340 | |||
354 | def shutdown(self, restart=False): |
|
341 | def shutdown(self, restart=False): | |
@@ -365,38 +352,9 b' class ShellSocketChannel(ZMQSocketChannel):' | |||||
365 | # Send quit message to kernel. Once we implement kernel-side setattr, |
|
352 | # Send quit message to kernel. Once we implement kernel-side setattr, | |
366 | # this should probably be done that way, but for now this will do. |
|
353 | # this should probably be done that way, but for now this will do. | |
367 | msg = self.session.msg('shutdown_request', {'restart':restart}) |
|
354 | msg = self.session.msg('shutdown_request', {'restart':restart}) | |
368 |
self._queue_ |
|
355 | self._queue_send(msg) | |
369 | return msg['header']['msg_id'] |
|
356 | return msg['header']['msg_id'] | |
370 |
|
357 | |||
371 | def _handle_events(self, socket, events): |
|
|||
372 | if events & POLLERR: |
|
|||
373 | self._handle_err() |
|
|||
374 | if events & POLLOUT: |
|
|||
375 | self._handle_send() |
|
|||
376 | if events & POLLIN: |
|
|||
377 | self._handle_recv() |
|
|||
378 |
|
||||
379 | def _handle_recv(self): |
|
|||
380 | ident,msg = self.session.recv(self.socket, 0) |
|
|||
381 | self.call_handlers(msg) |
|
|||
382 |
|
||||
383 | def _handle_send(self): |
|
|||
384 | try: |
|
|||
385 | msg = self.command_queue.get(False) |
|
|||
386 | except Empty: |
|
|||
387 | pass |
|
|||
388 | else: |
|
|||
389 | self.session.send(self.socket,msg) |
|
|||
390 | if self.command_queue.empty(): |
|
|||
391 | self.drop_io_state(POLLOUT) |
|
|||
392 |
|
||||
393 | def _handle_err(self): |
|
|||
394 | # We don't want to let this go silently, so eventually we should log. |
|
|||
395 | raise zmq.ZMQError() |
|
|||
396 |
|
||||
397 | def _queue_request(self, msg): |
|
|||
398 | self.command_queue.put(msg) |
|
|||
399 | self.add_io_state(POLLOUT) |
|
|||
400 |
|
358 | |||
401 |
|
359 | |||
402 | class SubSocketChannel(ZMQSocketChannel): |
|
360 | class SubSocketChannel(ZMQSocketChannel): | |
@@ -413,9 +371,8 b' class SubSocketChannel(ZMQSocketChannel):' | |||||
413 | self.socket.setsockopt(zmq.SUBSCRIBE,b'') |
|
371 | self.socket.setsockopt(zmq.SUBSCRIBE,b'') | |
414 | self.socket.setsockopt(zmq.IDENTITY, self.session.bsession) |
|
372 | self.socket.setsockopt(zmq.IDENTITY, self.session.bsession) | |
415 | self.socket.connect('tcp://%s:%i' % self.address) |
|
373 | self.socket.connect('tcp://%s:%i' % self.address) | |
416 | self.iostate = POLLIN|POLLERR |
|
374 | self.stream = zmqstream.ZMQStream(self.socket, self.ioloop) | |
417 | self.ioloop.add_handler(self.socket, self._handle_events, |
|
375 | self.stream.on_recv(self._handle_recv) | |
418 | self.iostate) |
|
|||
419 | self._run_loop() |
|
376 | self._run_loop() | |
420 |
|
377 | |||
421 | def stop(self): |
|
378 | def stop(self): | |
@@ -456,33 +413,9 b' class SubSocketChannel(ZMQSocketChannel):' | |||||
456 | while not self._flushed and time.time() < stop_time: |
|
413 | while not self._flushed and time.time() < stop_time: | |
457 | time.sleep(0.01) |
|
414 | time.sleep(0.01) | |
458 |
|
415 | |||
459 | def _handle_events(self, socket, events): |
|
|||
460 | # Turn on and off POLLOUT depending on if we have made a request |
|
|||
461 | if events & POLLERR: |
|
|||
462 | self._handle_err() |
|
|||
463 | if events & POLLIN: |
|
|||
464 | self._handle_recv() |
|
|||
465 |
|
||||
466 | def _handle_err(self): |
|
|||
467 | # We don't want to let this go silently, so eventually we should log. |
|
|||
468 | raise zmq.ZMQError() |
|
|||
469 |
|
||||
470 | def _handle_recv(self): |
|
|||
471 | # Get all of the messages we can |
|
|||
472 | while True: |
|
|||
473 | try: |
|
|||
474 | ident,msg = self.session.recv(self.socket) |
|
|||
475 | except zmq.ZMQError: |
|
|||
476 | # Check the errno? |
|
|||
477 | # Will this trigger POLLERR? |
|
|||
478 | break |
|
|||
479 | else: |
|
|||
480 | if msg is None: |
|
|||
481 | break |
|
|||
482 | self.call_handlers(msg) |
|
|||
483 |
|
||||
484 | def _flush(self): |
|
416 | def _flush(self): | |
485 | """Callback for :method:`self.flush`.""" |
|
417 | """Callback for :method:`self.flush`.""" | |
|
418 | self.stream.flush() | |||
486 | self._flushed = True |
|
419 | self._flushed = True | |
487 |
|
420 | |||
488 |
|
421 | |||
@@ -494,16 +427,14 b' class StdInSocketChannel(ZMQSocketChannel):' | |||||
494 | def __init__(self, context, session, address): |
|
427 | def __init__(self, context, session, address): | |
495 | super(StdInSocketChannel, self).__init__(context, session, address) |
|
428 | super(StdInSocketChannel, self).__init__(context, session, address) | |
496 | self.ioloop = ioloop.IOLoop() |
|
429 | self.ioloop = ioloop.IOLoop() | |
497 | self.msg_queue = Queue() |
|
|||
498 |
|
430 | |||
499 | def run(self): |
|
431 | def run(self): | |
500 | """The thread's main activity. Call start() instead.""" |
|
432 | """The thread's main activity. Call start() instead.""" | |
501 | self.socket = self.context.socket(zmq.DEALER) |
|
433 | self.socket = self.context.socket(zmq.DEALER) | |
502 | self.socket.setsockopt(zmq.IDENTITY, self.session.bsession) |
|
434 | self.socket.setsockopt(zmq.IDENTITY, self.session.bsession) | |
503 | self.socket.connect('tcp://%s:%i' % self.address) |
|
435 | self.socket.connect('tcp://%s:%i' % self.address) | |
504 | self.iostate = POLLERR|POLLIN |
|
436 | self.stream = zmqstream.ZMQStream(self.socket, self.ioloop) | |
505 | self.ioloop.add_handler(self.socket, self._handle_events, |
|
437 | self.stream.on_recv(self._handle_recv) | |
506 | self.iostate) |
|
|||
507 | self._run_loop() |
|
438 | self._run_loop() | |
508 |
|
439 | |||
509 | def stop(self): |
|
440 | def stop(self): | |
@@ -524,37 +455,7 b' class StdInSocketChannel(ZMQSocketChannel):' | |||||
524 | """Send a string of raw input to the kernel.""" |
|
455 | """Send a string of raw input to the kernel.""" | |
525 | content = dict(value=string) |
|
456 | content = dict(value=string) | |
526 | msg = self.session.msg('input_reply', content) |
|
457 | msg = self.session.msg('input_reply', content) | |
527 |
self._queue_ |
|
458 | self._queue_send(msg) | |
528 |
|
||||
529 | def _handle_events(self, socket, events): |
|
|||
530 | if events & POLLERR: |
|
|||
531 | self._handle_err() |
|
|||
532 | if events & POLLOUT: |
|
|||
533 | self._handle_send() |
|
|||
534 | if events & POLLIN: |
|
|||
535 | self._handle_recv() |
|
|||
536 |
|
||||
537 | def _handle_recv(self): |
|
|||
538 | ident,msg = self.session.recv(self.socket, 0) |
|
|||
539 | self.call_handlers(msg) |
|
|||
540 |
|
||||
541 | def _handle_send(self): |
|
|||
542 | try: |
|
|||
543 | msg = self.msg_queue.get(False) |
|
|||
544 | except Empty: |
|
|||
545 | pass |
|
|||
546 | else: |
|
|||
547 | self.session.send(self.socket,msg) |
|
|||
548 | if self.msg_queue.empty(): |
|
|||
549 | self.drop_io_state(POLLOUT) |
|
|||
550 |
|
||||
551 | def _handle_err(self): |
|
|||
552 | # We don't want to let this go silently, so eventually we should log. |
|
|||
553 | raise zmq.ZMQError() |
|
|||
554 |
|
||||
555 | def _queue_reply(self, msg): |
|
|||
556 | self.msg_queue.put(msg) |
|
|||
557 | self.add_io_state(POLLOUT) |
|
|||
558 |
|
459 | |||
559 |
|
460 | |||
560 | class HBSocketChannel(ZMQSocketChannel): |
|
461 | class HBSocketChannel(ZMQSocketChannel): |
General Comments 0
You need to be logged in to leave comments.
Login now