iostream.py
231 lines
| 7.8 KiB
| text/x-python
|
PythonLexer
MinRK
|
r9444 | """wrappers for stdout/stderr forwarding over zmq | ||
""" | ||||
#----------------------------------------------------------------------------- | ||||
# Copyright (C) 2013 The IPython Development Team | ||||
# | ||||
# Distributed under the terms of the BSD License. The full license is in | ||||
# the file COPYING, distributed as part of this software. | ||||
#----------------------------------------------------------------------------- | ||||
Piti Ongmongkolkul
|
r9436 | import os | ||
MinRK
|
r9440 | import threading | ||
MinRK
|
r10193 | import time | ||
MinRK
|
r9440 | import uuid | ||
MinRK
|
r10193 | from io import StringIO, UnsupportedOperation | ||
Brian Granger
|
r2754 | |||
MinRK
|
r9440 | import zmq | ||
Thomas Kluyver
|
r13347 | from .session import extract_header | ||
Brian Granger
|
r2754 | |||
Bradley M. Froehle
|
r7847 | from IPython.utils import py3compat | ||
Thomas Kluyver
|
r13353 | from IPython.utils.py3compat import unicode_type | ||
MinRK
|
r13825 | from IPython.utils.warn import warn | ||
Fernando Perez
|
r2938 | |||
Brian Granger
|
r2754 | #----------------------------------------------------------------------------- | ||
Fernando Perez
|
r3322 | # Globals | ||
Brian Granger
|
r2754 | #----------------------------------------------------------------------------- | ||
Fernando Perez
|
r3322 | |||
MinRK
|
r9444 | MASTER = 0 | ||
CHILD = 1 | ||||
MinRK
|
r9440 | |||
Fernando Perez
|
r3322 | #----------------------------------------------------------------------------- | ||
Brian Granger
|
r2754 | # Stream classes | ||
#----------------------------------------------------------------------------- | ||||
class OutStream(object): | ||||
"""A file like object that publishes the stream to a 0MQ PUB socket.""" | ||||
# The time interval between automatic flushes, in seconds. | ||||
MinRK
|
r9452 | _subprocess_flush_limit = 256 | ||
Brian Granger
|
r2754 | flush_interval = 0.05 | ||
MinRK
|
r3602 | topic=None | ||
MinRK
|
r9447 | def __init__(self, session, pub_socket, name, pipe=True): | ||
Bradley M. Froehle
|
r8595 | self.encoding = 'UTF-8' | ||
Brian Granger
|
r2754 | self.session = session | ||
self.pub_socket = pub_socket | ||||
self.name = name | ||||
MinRK
|
r11697 | self.topic = b'stream.' + py3compat.cast_bytes(name) | ||
Brian Granger
|
r2754 | self.parent_header = {} | ||
self._new_buffer() | ||||
MinRK
|
r9440 | self._buffer_lock = threading.Lock() | ||
self._master_pid = os.getpid() | ||||
self._master_thread = threading.current_thread().ident | ||||
self._pipe_pid = os.getpid() | ||||
MinRK
|
r9447 | self._pipe_flag = pipe | ||
if pipe: | ||||
self._setup_pipe_in() | ||||
MinRK
|
r9440 | |||
def _setup_pipe_in(self): | ||||
"""setup listening pipe for subprocesses""" | ||||
MinRK
|
r9443 | ctx = self.pub_socket.context | ||
MinRK
|
r9440 | |||
# use UUID to authenticate pipe messages | ||||
self._pipe_uuid = uuid.uuid4().bytes | ||||
MinRK
|
r9443 | self._pipe_in = ctx.socket(zmq.PULL) | ||
self._pipe_in.linger = 0 | ||||
MinRK
|
r13825 | try: | ||
self._pipe_port = self._pipe_in.bind_to_random_port("tcp://127.0.0.1") | ||||
except zmq.ZMQError as e: | ||||
warn("Couldn't bind IOStream to 127.0.0.1: %s" % e + | ||||
"\nsubprocess output will be unavailable." | ||||
) | ||||
self._pipe_flag = False | ||||
self._pipe_in.close() | ||||
del self._pipe_in | ||||
return | ||||
MinRK
|
r9443 | self._pipe_poller = zmq.Poller() | ||
self._pipe_poller.register(self._pipe_in, zmq.POLLIN) | ||||
MinRK
|
r9440 | |||
def _setup_pipe_out(self): | ||||
# must be new context after fork | ||||
ctx = zmq.Context() | ||||
self._pipe_pid = os.getpid() | ||||
self._pipe_out = ctx.socket(zmq.PUSH) | ||||
self._pipe_out_lock = threading.Lock() | ||||
self._pipe_out.connect("tcp://127.0.0.1:%i" % self._pipe_port) | ||||
Piti Ongmongkolkul
|
r9436 | def _is_master_process(self): | ||
MinRK
|
r9440 | return os.getpid() == self._master_pid | ||
def _is_master_thread(self): | ||||
return threading.current_thread().ident == self._master_thread | ||||
def _have_pipe_out(self): | ||||
return os.getpid() == self._pipe_pid | ||||
Piti Ongmongkolkul
|
r9436 | |||
def _check_mp_mode(self): | ||||
MinRK
|
r9440 | """check for forks, and switch to zmq pipeline if necessary""" | ||
MinRK
|
r9447 | if not self._pipe_flag or self._is_master_process(): | ||
MinRK
|
r13825 | return MASTER | ||
MinRK
|
r9440 | else: | ||
if not self._have_pipe_out(): | ||||
MinRK
|
r9451 | self._flush_buffer() | ||
MinRK
|
r9440 | # setup a new out pipe | ||
self._setup_pipe_out() | ||||
return CHILD | ||||
Brian Granger
|
r2754 | |||
def set_parent(self, parent): | ||||
self.parent_header = extract_header(parent) | ||||
def close(self): | ||||
self.pub_socket = None | ||||
MinRK
|
r9443 | def _flush_from_subprocesses(self): | ||
"""flush possible pub data from subprocesses into my buffer""" | ||||
MinRK
|
r9447 | if not self._pipe_flag or not self._is_master_process(): | ||
MinRK
|
r9443 | return | ||
MinRK
|
r9452 | for i in range(self._subprocess_flush_limit): | ||
MinRK
|
r9443 | if self._pipe_poller.poll(0): | ||
msg = self._pipe_in.recv_multipart() | ||||
if msg[0] != self._pipe_uuid: | ||||
continue | ||||
else: | ||||
self._buffer.write(msg[1].decode(self.encoding, 'replace')) | ||||
# this always means a flush, | ||||
# so reset our timer | ||||
self._start = 0 | ||||
else: | ||||
break | ||||
Brian Granger
|
r2754 | def flush(self): | ||
MinRK
|
r9440 | """trigger actual zmq send""" | ||
Brian Granger
|
r2754 | if self.pub_socket is None: | ||
raise ValueError(u'I/O operation on closed file') | ||||
MinRK
|
r9443 | |||
mp_mode = self._check_mp_mode() | ||||
if mp_mode != CHILD: | ||||
# we are master | ||||
if not self._is_master_thread(): | ||||
# sub-threads must not trigger flush, | ||||
# but at least they can force the timer. | ||||
self._start = 0 | ||||
return | ||||
self._flush_from_subprocesses() | ||||
data = self._flush_buffer() | ||||
if data: | ||||
content = {u'name':self.name, u'data':data} | ||||
msg = self.session.send(self.pub_socket, u'stream', content=content, | ||||
parent=self.parent_header, ident=self.topic) | ||||
if hasattr(self.pub_socket, 'flush'): | ||||
# socket itself has flush (presumably ZMQStream) | ||||
self.pub_socket.flush() | ||||
Brian Granger
|
r2754 | else: | ||
MinRK
|
r9443 | with self._pipe_out_lock: | ||
string = self._flush_buffer() | ||||
tracker = self._pipe_out.send_multipart([ | ||||
self._pipe_uuid, | ||||
string.encode(self.encoding, 'replace'), | ||||
], copy=False, track=True) | ||||
MinRK
|
r9453 | try: | ||
tracker.wait(1) | ||||
except: | ||||
pass | ||||
Brian Granger
|
r2754 | |||
def isatty(self): | ||||
return False | ||||
Bradley M. Froehle
|
r7847 | def __next__(self): | ||
Brian Granger
|
r2754 | raise IOError('Read not supported on a write only stream.') | ||
Bradley M. Froehle
|
r7847 | if not py3compat.PY3: | ||
next = __next__ | ||||
Brian Granger
|
r2754 | def read(self, size=-1): | ||
raise IOError('Read not supported on a write only stream.') | ||||
def readline(self, size=-1): | ||||
raise IOError('Read not supported on a write only stream.') | ||||
MinRK
|
r10193 | |||
def fileno(self): | ||||
raise UnsupportedOperation("IOStream has no fileno.") | ||||
Brian Granger
|
r2754 | |||
def write(self, string): | ||||
if self.pub_socket is None: | ||||
raise ValueError('I/O operation on closed file') | ||||
else: | ||||
Thomas Kluyver
|
r4070 | # Make sure that we're handling unicode | ||
Thomas Kluyver
|
r13353 | if not isinstance(string, unicode_type): | ||
Bradley M. Froehle
|
r8595 | string = string.decode(self.encoding, 'replace') | ||
MinRK
|
r9451 | |||
is_child = (self._check_mp_mode() == CHILD) | ||||
MinRK
|
r9443 | self._buffer.write(string) | ||
MinRK
|
r9451 | if is_child: | ||
# newlines imply flush in subprocesses | ||||
# mp.Pool cannot be trusted to flush promptly (or ever), | ||||
# and this helps. | ||||
if '\n' in string: | ||||
self.flush() | ||||
MinRK
|
r9443 | # do we want to check subprocess flushes on write? | ||
# self._flush_from_subprocesses() | ||||
Brian Granger
|
r2754 | current_time = time.time() | ||
MinRK
|
r9440 | if self._start < 0: | ||
Brian Granger
|
r2754 | self._start = current_time | ||
elif current_time - self._start > self.flush_interval: | ||||
self.flush() | ||||
def writelines(self, sequence): | ||||
if self.pub_socket is None: | ||||
raise ValueError('I/O operation on closed file') | ||||
else: | ||||
for string in sequence: | ||||
self.write(string) | ||||
MinRK
|
r9443 | def _flush_buffer(self): | ||
"""clear the current buffer and return the current buffer data""" | ||||
data = u'' | ||||
if self._buffer is not None: | ||||
data = self._buffer.getvalue() | ||||
self._buffer.close() | ||||
self._new_buffer() | ||||
return data | ||||
Brian Granger
|
r2754 | def _new_buffer(self): | ||
self._buffer = StringIO() | ||||
Fernando Perez
|
r2938 | self._start = -1 | ||