Show More
@@ -0,0 +1,67 | |||||
|
1 | #!/usr/bin/env python | |||
|
2 | """Utility for forwarding file read events over a zmq socket. | |||
|
3 | ||||
|
4 | This is necessary because select on Windows only supports""" | |||
|
5 | ||||
|
6 | #----------------------------------------------------------------------------- | |||
|
7 | # Copyright (C) 2011 The IPython Development Team | |||
|
8 | # | |||
|
9 | # Distributed under the terms of the BSD License. The full license is in | |||
|
10 | # the file COPYING, distributed as part of this software. | |||
|
11 | #----------------------------------------------------------------------------- | |||
|
12 | ||||
|
13 | #----------------------------------------------------------------------------- | |||
|
14 | # Imports | |||
|
15 | #----------------------------------------------------------------------------- | |||
|
16 | ||||
|
17 | import uuid | |||
|
18 | import zmq | |||
|
19 | ||||
|
20 | from threading import Thread | |||
|
21 | ||||
|
22 | #----------------------------------------------------------------------------- | |||
|
23 | # Code | |||
|
24 | #----------------------------------------------------------------------------- | |||
|
25 | ||||
|
26 | class ForwarderThread(Thread): | |||
|
27 | def __init__(self, sock, fd): | |||
|
28 | Thread.__init__(self) | |||
|
29 | self.daemon=True | |||
|
30 | self.sock = sock | |||
|
31 | self.fd = fd | |||
|
32 | ||||
|
33 | def run(self): | |||
|
34 | """loop through lines in self.fd, and send them over self.sock""" | |||
|
35 | line = self.fd.readline() | |||
|
36 | # allow for files opened in unicode mode | |||
|
37 | if isinstance(line, unicode): | |||
|
38 | send = self.sock.send_unicode | |||
|
39 | else: | |||
|
40 | send = self.sock.send | |||
|
41 | while line: | |||
|
42 | send(line) | |||
|
43 | line = self.fd.readline() | |||
|
44 | # line == '' means EOF | |||
|
45 | self.fd.close() | |||
|
46 | self.sock.close() | |||
|
47 | ||||
|
48 | def forward_read_events(fd, context=None): | |||
|
49 | """forward read events from an FD over a socket. | |||
|
50 | ||||
|
51 | This method wraps a file in a socket pair, so it can | |||
|
52 | be polled for read events by select (specifically zmq.eventloop.ioloop) | |||
|
53 | """ | |||
|
54 | if context is None: | |||
|
55 | context = zmq.Context.instance() | |||
|
56 | push = context.socket(zmq.PUSH) | |||
|
57 | push.setsockopt(zmq.LINGER, -1) | |||
|
58 | pull = context.socket(zmq.PULL) | |||
|
59 | addr='inproc://%s'%uuid.uuid4() | |||
|
60 | push.bind(addr) | |||
|
61 | pull.connect(addr) | |||
|
62 | forwarder = ForwarderThread(push, fd) | |||
|
63 | forwarder.start() | |||
|
64 | return pull | |||
|
65 | ||||
|
66 | ||||
|
67 | __all__ = ['forward_read_events'] No newline at end of file |
@@ -48,6 +48,8 from IPython.utils.process import find_cmd, pycmd2argv, FindCmdError | |||||
48 |
|
48 | |||
49 | from IPython.parallel.factory import LoggingFactory |
|
49 | from IPython.parallel.factory import LoggingFactory | |
50 |
|
50 | |||
|
51 | from .win32support import forward_read_events | |||
|
52 | ||||
51 | # load winhpcjob only on Windows |
|
53 | # load winhpcjob only on Windows | |
52 | try: |
|
54 | try: | |
53 | from .winhpcjob import ( |
|
55 | from .winhpcjob import ( | |
@@ -57,7 +59,7 try: | |||||
57 | except ImportError: |
|
59 | except ImportError: | |
58 | pass |
|
60 | pass | |
59 |
|
61 | |||
60 |
|
62 | WINDOWS = os.name == 'nt' | ||
61 | #----------------------------------------------------------------------------- |
|
63 | #----------------------------------------------------------------------------- | |
62 | # Paths to the kernel apps |
|
64 | # Paths to the kernel apps | |
63 | #----------------------------------------------------------------------------- |
|
65 | #----------------------------------------------------------------------------- | |
@@ -251,9 +253,14 class LocalProcessLauncher(BaseLauncher): | |||||
251 | env=os.environ, |
|
253 | env=os.environ, | |
252 | cwd=self.work_dir |
|
254 | cwd=self.work_dir | |
253 | ) |
|
255 | ) | |
254 |
|
256 | if WINDOWS: | ||
255 | self.loop.add_handler(self.process.stdout.fileno(), self.handle_stdout, self.loop.READ) |
|
257 | self.stdout = forward_read_events(self.process.stdout) | |
256 | self.loop.add_handler(self.process.stderr.fileno(), self.handle_stderr, self.loop.READ) |
|
258 | self.stderr = forward_read_events(self.process.stderr) | |
|
259 | else: | |||
|
260 | self.stdout = self.process.stdout.fileno() | |||
|
261 | self.stderr = self.process.stderr.fileno() | |||
|
262 | self.loop.add_handler(self.stdout, self.handle_stdout, self.loop.READ) | |||
|
263 | self.loop.add_handler(self.stderr, self.handle_stderr, self.loop.READ) | |||
257 | self.poller = ioloop.PeriodicCallback(self.poll, self.poll_frequency, self.loop) |
|
264 | self.poller = ioloop.PeriodicCallback(self.poll, self.poll_frequency, self.loop) | |
258 | self.poller.start() |
|
265 | self.poller.start() | |
259 | self.notify_start(self.process.pid) |
|
266 | self.notify_start(self.process.pid) | |
@@ -277,7 +284,10 class LocalProcessLauncher(BaseLauncher): | |||||
277 | # callbacks, etc: |
|
284 | # callbacks, etc: | |
278 |
|
285 | |||
279 | def handle_stdout(self, fd, events): |
|
286 | def handle_stdout(self, fd, events): | |
280 | line = self.process.stdout.readline() |
|
287 | if WINDOWS: | |
|
288 | line = self.stdout.recv() | |||
|
289 | else: | |||
|
290 | line = self.process.stdout.readline() | |||
281 | # a stopped process will be readable but return empty strings |
|
291 | # a stopped process will be readable but return empty strings | |
282 | if line: |
|
292 | if line: | |
283 | self.log.info(line[:-1]) |
|
293 | self.log.info(line[:-1]) | |
@@ -285,7 +295,10 class LocalProcessLauncher(BaseLauncher): | |||||
285 | self.poll() |
|
295 | self.poll() | |
286 |
|
296 | |||
287 | def handle_stderr(self, fd, events): |
|
297 | def handle_stderr(self, fd, events): | |
288 | line = self.process.stderr.readline() |
|
298 | if WINDOWS: | |
|
299 | line = self.stderr.recv() | |||
|
300 | else: | |||
|
301 | line = self.process.stderr.readline() | |||
289 | # a stopped process will be readable but return empty strings |
|
302 | # a stopped process will be readable but return empty strings | |
290 | if line: |
|
303 | if line: | |
291 | self.log.error(line[:-1]) |
|
304 | self.log.error(line[:-1]) | |
@@ -296,8 +309,8 class LocalProcessLauncher(BaseLauncher): | |||||
296 | status = self.process.poll() |
|
309 | status = self.process.poll() | |
297 | if status is not None: |
|
310 | if status is not None: | |
298 | self.poller.stop() |
|
311 | self.poller.stop() | |
299 |
self.loop.remove_handler(self. |
|
312 | self.loop.remove_handler(self.stdout) | |
300 |
self.loop.remove_handler(self. |
|
313 | self.loop.remove_handler(self.stderr) | |
301 | self.notify_stop(dict(exit_code=status, pid=self.process.pid)) |
|
314 | self.notify_stop(dict(exit_code=status, pid=self.process.pid)) | |
302 | return status |
|
315 | return status | |
303 |
|
316 | |||
@@ -588,7 +601,7 class SSHEngineSetLauncher(LocalEngineSetLauncher): | |||||
588 |
|
601 | |||
589 | # This is only used on Windows. |
|
602 | # This is only used on Windows. | |
590 | def find_job_cmd(): |
|
603 | def find_job_cmd(): | |
591 | if os.name=='nt': |
|
604 | if WINDOWS: | |
592 | try: |
|
605 | try: | |
593 | return find_cmd('job') |
|
606 | return find_cmd('job') | |
594 | except (FindCmdError, ImportError): |
|
607 | except (FindCmdError, ImportError): |
General Comments 0
You need to be logged in to leave comments.
Login now