diff --git a/IPython/parallel/apps/launcher.py b/IPython/parallel/apps/launcher.py index 1e76ed1..53671ec 100644 --- a/IPython/parallel/apps/launcher.py +++ b/IPython/parallel/apps/launcher.py @@ -48,6 +48,8 @@ from IPython.utils.process import find_cmd, pycmd2argv, FindCmdError from IPython.parallel.factory import LoggingFactory +from .win32support import forward_read_events + # load winhpcjob only on Windows try: from .winhpcjob import ( @@ -57,7 +59,7 @@ try: except ImportError: pass - +WINDOWS = os.name == 'nt' #----------------------------------------------------------------------------- # Paths to the kernel apps #----------------------------------------------------------------------------- @@ -251,9 +253,14 @@ class LocalProcessLauncher(BaseLauncher): env=os.environ, cwd=self.work_dir ) - - self.loop.add_handler(self.process.stdout.fileno(), self.handle_stdout, self.loop.READ) - self.loop.add_handler(self.process.stderr.fileno(), self.handle_stderr, self.loop.READ) + if WINDOWS: + self.stdout = forward_read_events(self.process.stdout) + self.stderr = forward_read_events(self.process.stderr) + else: + self.stdout = self.process.stdout.fileno() + self.stderr = self.process.stderr.fileno() + self.loop.add_handler(self.stdout, self.handle_stdout, self.loop.READ) + self.loop.add_handler(self.stderr, self.handle_stderr, self.loop.READ) self.poller = ioloop.PeriodicCallback(self.poll, self.poll_frequency, self.loop) self.poller.start() self.notify_start(self.process.pid) @@ -277,7 +284,10 @@ class LocalProcessLauncher(BaseLauncher): # callbacks, etc: def handle_stdout(self, fd, events): - line = self.process.stdout.readline() + if WINDOWS: + line = self.stdout.recv() + else: + line = self.process.stdout.readline() # a stopped process will be readable but return empty strings if line: self.log.info(line[:-1]) @@ -285,7 +295,10 @@ class LocalProcessLauncher(BaseLauncher): self.poll() def handle_stderr(self, fd, events): - line = self.process.stderr.readline() + if WINDOWS: + line = self.stderr.recv() + else: + line = self.process.stderr.readline() # a stopped process will be readable but return empty strings if line: self.log.error(line[:-1]) @@ -296,8 +309,8 @@ class LocalProcessLauncher(BaseLauncher): status = self.process.poll() if status is not None: self.poller.stop() - self.loop.remove_handler(self.process.stdout.fileno()) - self.loop.remove_handler(self.process.stderr.fileno()) + self.loop.remove_handler(self.stdout) + self.loop.remove_handler(self.stderr) self.notify_stop(dict(exit_code=status, pid=self.process.pid)) return status @@ -588,7 +601,7 @@ class SSHEngineSetLauncher(LocalEngineSetLauncher): # This is only used on Windows. def find_job_cmd(): - if os.name=='nt': + if WINDOWS: try: return find_cmd('job') except (FindCmdError, ImportError): diff --git a/IPython/parallel/apps/win32support.py b/IPython/parallel/apps/win32support.py new file mode 100644 index 0000000..9d97225 --- /dev/null +++ b/IPython/parallel/apps/win32support.py @@ -0,0 +1,67 @@ +#!/usr/bin/env python +"""Utility for forwarding file read events over a zmq socket. + +This is necessary because select on Windows only supports""" + +#----------------------------------------------------------------------------- +# Copyright (C) 2011 The IPython Development Team +# +# Distributed under the terms of the BSD License. The full license is in +# the file COPYING, distributed as part of this software. +#----------------------------------------------------------------------------- + +#----------------------------------------------------------------------------- +# Imports +#----------------------------------------------------------------------------- + +import uuid +import zmq + +from threading import Thread + +#----------------------------------------------------------------------------- +# Code +#----------------------------------------------------------------------------- + +class ForwarderThread(Thread): + def __init__(self, sock, fd): + Thread.__init__(self) + self.daemon=True + self.sock = sock + self.fd = fd + + def run(self): + """loop through lines in self.fd, and send them over self.sock""" + line = self.fd.readline() + # allow for files opened in unicode mode + if isinstance(line, unicode): + send = self.sock.send_unicode + else: + send = self.sock.send + while line: + send(line) + line = self.fd.readline() + # line == '' means EOF + self.fd.close() + self.sock.close() + +def forward_read_events(fd, context=None): + """forward read events from an FD over a socket. + + This method wraps a file in a socket pair, so it can + be polled for read events by select (specifically zmq.eventloop.ioloop) + """ + if context is None: + context = zmq.Context.instance() + push = context.socket(zmq.PUSH) + push.setsockopt(zmq.LINGER, -1) + pull = context.socket(zmq.PULL) + addr='inproc://%s'%uuid.uuid4() + push.bind(addr) + pull.connect(addr) + forwarder = ForwarderThread(push, fd) + forwarder.start() + return pull + + +__all__ = ['forward_read_events'] \ No newline at end of file