Show More
@@ -0,0 +1,67 | |||
|
1 | #!/usr/bin/env python | |
|
2 | """Utility for forwarding file read events over a zmq socket. | |
|
3 | ||
|
4 | This is necessary because select on Windows only supports""" | |
|
5 | ||
|
6 | #----------------------------------------------------------------------------- | |
|
7 | # Copyright (C) 2011 The IPython Development Team | |
|
8 | # | |
|
9 | # Distributed under the terms of the BSD License. The full license is in | |
|
10 | # the file COPYING, distributed as part of this software. | |
|
11 | #----------------------------------------------------------------------------- | |
|
12 | ||
|
13 | #----------------------------------------------------------------------------- | |
|
14 | # Imports | |
|
15 | #----------------------------------------------------------------------------- | |
|
16 | ||
|
17 | import uuid | |
|
18 | import zmq | |
|
19 | ||
|
20 | from threading import Thread | |
|
21 | ||
|
22 | #----------------------------------------------------------------------------- | |
|
23 | # Code | |
|
24 | #----------------------------------------------------------------------------- | |
|
25 | ||
|
26 | class ForwarderThread(Thread): | |
|
27 | def __init__(self, sock, fd): | |
|
28 | Thread.__init__(self) | |
|
29 | self.daemon=True | |
|
30 | self.sock = sock | |
|
31 | self.fd = fd | |
|
32 | ||
|
33 | def run(self): | |
|
34 | """loop through lines in self.fd, and send them over self.sock""" | |
|
35 | line = self.fd.readline() | |
|
36 | # allow for files opened in unicode mode | |
|
37 | if isinstance(line, unicode): | |
|
38 | send = self.sock.send_unicode | |
|
39 | else: | |
|
40 | send = self.sock.send | |
|
41 | while line: | |
|
42 | send(line) | |
|
43 | line = self.fd.readline() | |
|
44 | # line == '' means EOF | |
|
45 | self.fd.close() | |
|
46 | self.sock.close() | |
|
47 | ||
|
48 | def forward_read_events(fd, context=None): | |
|
49 | """forward read events from an FD over a socket. | |
|
50 | ||
|
51 | This method wraps a file in a socket pair, so it can | |
|
52 | be polled for read events by select (specifically zmq.eventloop.ioloop) | |
|
53 | """ | |
|
54 | if context is None: | |
|
55 | context = zmq.Context.instance() | |
|
56 | push = context.socket(zmq.PUSH) | |
|
57 | push.setsockopt(zmq.LINGER, -1) | |
|
58 | pull = context.socket(zmq.PULL) | |
|
59 | addr='inproc://%s'%uuid.uuid4() | |
|
60 | push.bind(addr) | |
|
61 | pull.connect(addr) | |
|
62 | forwarder = ForwarderThread(push, fd) | |
|
63 | forwarder.start() | |
|
64 | return pull | |
|
65 | ||
|
66 | ||
|
67 | __all__ = ['forward_read_events'] No newline at end of file |
@@ -48,6 +48,8 from IPython.utils.process import find_cmd, pycmd2argv, FindCmdError | |||
|
48 | 48 | |
|
49 | 49 | from IPython.parallel.factory import LoggingFactory |
|
50 | 50 | |
|
51 | from .win32support import forward_read_events | |
|
52 | ||
|
51 | 53 | # load winhpcjob only on Windows |
|
52 | 54 | try: |
|
53 | 55 | from .winhpcjob import ( |
@@ -57,7 +59,7 try: | |||
|
57 | 59 | except ImportError: |
|
58 | 60 | pass |
|
59 | 61 | |
|
60 | ||
|
62 | WINDOWS = os.name == 'nt' | |
|
61 | 63 | #----------------------------------------------------------------------------- |
|
62 | 64 | # Paths to the kernel apps |
|
63 | 65 | #----------------------------------------------------------------------------- |
@@ -251,9 +253,14 class LocalProcessLauncher(BaseLauncher): | |||
|
251 | 253 | env=os.environ, |
|
252 | 254 | cwd=self.work_dir |
|
253 | 255 | ) |
|
254 | ||
|
255 | self.loop.add_handler(self.process.stdout.fileno(), self.handle_stdout, self.loop.READ) | |
|
256 | self.loop.add_handler(self.process.stderr.fileno(), self.handle_stderr, self.loop.READ) | |
|
256 | if WINDOWS: | |
|
257 | self.stdout = forward_read_events(self.process.stdout) | |
|
258 | self.stderr = forward_read_events(self.process.stderr) | |
|
259 | else: | |
|
260 | self.stdout = self.process.stdout.fileno() | |
|
261 | self.stderr = self.process.stderr.fileno() | |
|
262 | self.loop.add_handler(self.stdout, self.handle_stdout, self.loop.READ) | |
|
263 | self.loop.add_handler(self.stderr, self.handle_stderr, self.loop.READ) | |
|
257 | 264 | self.poller = ioloop.PeriodicCallback(self.poll, self.poll_frequency, self.loop) |
|
258 | 265 | self.poller.start() |
|
259 | 266 | self.notify_start(self.process.pid) |
@@ -277,7 +284,10 class LocalProcessLauncher(BaseLauncher): | |||
|
277 | 284 | # callbacks, etc: |
|
278 | 285 | |
|
279 | 286 | def handle_stdout(self, fd, events): |
|
280 | line = self.process.stdout.readline() | |
|
287 | if WINDOWS: | |
|
288 | line = self.stdout.recv() | |
|
289 | else: | |
|
290 | line = self.process.stdout.readline() | |
|
281 | 291 | # a stopped process will be readable but return empty strings |
|
282 | 292 | if line: |
|
283 | 293 | self.log.info(line[:-1]) |
@@ -285,7 +295,10 class LocalProcessLauncher(BaseLauncher): | |||
|
285 | 295 | self.poll() |
|
286 | 296 | |
|
287 | 297 | def handle_stderr(self, fd, events): |
|
288 | line = self.process.stderr.readline() | |
|
298 | if WINDOWS: | |
|
299 | line = self.stderr.recv() | |
|
300 | else: | |
|
301 | line = self.process.stderr.readline() | |
|
289 | 302 | # a stopped process will be readable but return empty strings |
|
290 | 303 | if line: |
|
291 | 304 | self.log.error(line[:-1]) |
@@ -296,8 +309,8 class LocalProcessLauncher(BaseLauncher): | |||
|
296 | 309 | status = self.process.poll() |
|
297 | 310 | if status is not None: |
|
298 | 311 | self.poller.stop() |
|
299 |
self.loop.remove_handler(self. |
|
|
300 |
self.loop.remove_handler(self. |
|
|
312 | self.loop.remove_handler(self.stdout) | |
|
313 | self.loop.remove_handler(self.stderr) | |
|
301 | 314 | self.notify_stop(dict(exit_code=status, pid=self.process.pid)) |
|
302 | 315 | return status |
|
303 | 316 | |
@@ -588,7 +601,7 class SSHEngineSetLauncher(LocalEngineSetLauncher): | |||
|
588 | 601 | |
|
589 | 602 | # This is only used on Windows. |
|
590 | 603 | def find_job_cmd(): |
|
591 | if os.name=='nt': | |
|
604 | if WINDOWS: | |
|
592 | 605 | try: |
|
593 | 606 | return find_cmd('job') |
|
594 | 607 | except (FindCmdError, ImportError): |
General Comments 0
You need to be logged in to leave comments.
Login now