##// END OF EJS Templates
forward subprocess IO over zmq on Windows...
MinRK -
Show More
@@ -0,0 +1,67 b''
1 #!/usr/bin/env python
2 """Utility for forwarding file read events over a zmq socket.
3
4 This is necessary because select on Windows only supports"""
5
6 #-----------------------------------------------------------------------------
7 # Copyright (C) 2011 The IPython Development Team
8 #
9 # Distributed under the terms of the BSD License. The full license is in
10 # the file COPYING, distributed as part of this software.
11 #-----------------------------------------------------------------------------
12
13 #-----------------------------------------------------------------------------
14 # Imports
15 #-----------------------------------------------------------------------------
16
17 import uuid
18 import zmq
19
20 from threading import Thread
21
22 #-----------------------------------------------------------------------------
23 # Code
24 #-----------------------------------------------------------------------------
25
26 class ForwarderThread(Thread):
27 def __init__(self, sock, fd):
28 Thread.__init__(self)
29 self.daemon=True
30 self.sock = sock
31 self.fd = fd
32
33 def run(self):
34 """loop through lines in self.fd, and send them over self.sock"""
35 line = self.fd.readline()
36 # allow for files opened in unicode mode
37 if isinstance(line, unicode):
38 send = self.sock.send_unicode
39 else:
40 send = self.sock.send
41 while line:
42 send(line)
43 line = self.fd.readline()
44 # line == '' means EOF
45 self.fd.close()
46 self.sock.close()
47
48 def forward_read_events(fd, context=None):
49 """forward read events from an FD over a socket.
50
51 This method wraps a file in a socket pair, so it can
52 be polled for read events by select (specifically zmq.eventloop.ioloop)
53 """
54 if context is None:
55 context = zmq.Context.instance()
56 push = context.socket(zmq.PUSH)
57 push.setsockopt(zmq.LINGER, -1)
58 pull = context.socket(zmq.PULL)
59 addr='inproc://%s'%uuid.uuid4()
60 push.bind(addr)
61 pull.connect(addr)
62 forwarder = ForwarderThread(push, fd)
63 forwarder.start()
64 return pull
65
66
67 __all__ = ['forward_read_events'] No newline at end of file
@@ -48,6 +48,8 b' from IPython.utils.process import find_cmd, pycmd2argv, FindCmdError'
48
48
49 from IPython.parallel.factory import LoggingFactory
49 from IPython.parallel.factory import LoggingFactory
50
50
51 from .win32support import forward_read_events
52
51 # load winhpcjob only on Windows
53 # load winhpcjob only on Windows
52 try:
54 try:
53 from .winhpcjob import (
55 from .winhpcjob import (
@@ -57,7 +59,7 b' try:'
57 except ImportError:
59 except ImportError:
58 pass
60 pass
59
61
60
62 WINDOWS = os.name == 'nt'
61 #-----------------------------------------------------------------------------
63 #-----------------------------------------------------------------------------
62 # Paths to the kernel apps
64 # Paths to the kernel apps
63 #-----------------------------------------------------------------------------
65 #-----------------------------------------------------------------------------
@@ -251,9 +253,14 b' class LocalProcessLauncher(BaseLauncher):'
251 env=os.environ,
253 env=os.environ,
252 cwd=self.work_dir
254 cwd=self.work_dir
253 )
255 )
254
256 if WINDOWS:
255 self.loop.add_handler(self.process.stdout.fileno(), self.handle_stdout, self.loop.READ)
257 self.stdout = forward_read_events(self.process.stdout)
256 self.loop.add_handler(self.process.stderr.fileno(), self.handle_stderr, self.loop.READ)
258 self.stderr = forward_read_events(self.process.stderr)
259 else:
260 self.stdout = self.process.stdout.fileno()
261 self.stderr = self.process.stderr.fileno()
262 self.loop.add_handler(self.stdout, self.handle_stdout, self.loop.READ)
263 self.loop.add_handler(self.stderr, self.handle_stderr, self.loop.READ)
257 self.poller = ioloop.PeriodicCallback(self.poll, self.poll_frequency, self.loop)
264 self.poller = ioloop.PeriodicCallback(self.poll, self.poll_frequency, self.loop)
258 self.poller.start()
265 self.poller.start()
259 self.notify_start(self.process.pid)
266 self.notify_start(self.process.pid)
@@ -277,7 +284,10 b' class LocalProcessLauncher(BaseLauncher):'
277 # callbacks, etc:
284 # callbacks, etc:
278
285
279 def handle_stdout(self, fd, events):
286 def handle_stdout(self, fd, events):
280 line = self.process.stdout.readline()
287 if WINDOWS:
288 line = self.stdout.recv()
289 else:
290 line = self.process.stdout.readline()
281 # a stopped process will be readable but return empty strings
291 # a stopped process will be readable but return empty strings
282 if line:
292 if line:
283 self.log.info(line[:-1])
293 self.log.info(line[:-1])
@@ -285,7 +295,10 b' class LocalProcessLauncher(BaseLauncher):'
285 self.poll()
295 self.poll()
286
296
287 def handle_stderr(self, fd, events):
297 def handle_stderr(self, fd, events):
288 line = self.process.stderr.readline()
298 if WINDOWS:
299 line = self.stderr.recv()
300 else:
301 line = self.process.stderr.readline()
289 # a stopped process will be readable but return empty strings
302 # a stopped process will be readable but return empty strings
290 if line:
303 if line:
291 self.log.error(line[:-1])
304 self.log.error(line[:-1])
@@ -296,8 +309,8 b' class LocalProcessLauncher(BaseLauncher):'
296 status = self.process.poll()
309 status = self.process.poll()
297 if status is not None:
310 if status is not None:
298 self.poller.stop()
311 self.poller.stop()
299 self.loop.remove_handler(self.process.stdout.fileno())
312 self.loop.remove_handler(self.stdout)
300 self.loop.remove_handler(self.process.stderr.fileno())
313 self.loop.remove_handler(self.stderr)
301 self.notify_stop(dict(exit_code=status, pid=self.process.pid))
314 self.notify_stop(dict(exit_code=status, pid=self.process.pid))
302 return status
315 return status
303
316
@@ -588,7 +601,7 b' class SSHEngineSetLauncher(LocalEngineSetLauncher):'
588
601
589 # This is only used on Windows.
602 # This is only used on Windows.
590 def find_job_cmd():
603 def find_job_cmd():
591 if os.name=='nt':
604 if WINDOWS:
592 try:
605 try:
593 return find_cmd('job')
606 return find_cmd('job')
594 except (FindCmdError, ImportError):
607 except (FindCmdError, ImportError):
General Comments 0
You need to be logged in to leave comments. Login now