|
|
"""wrappers for stdout/stderr forwarding over zmq
|
|
|
"""
|
|
|
|
|
|
#-----------------------------------------------------------------------------
|
|
|
# Copyright (C) 2013 The IPython Development Team
|
|
|
#
|
|
|
# Distributed under the terms of the BSD License. The full license is in
|
|
|
# the file COPYING, distributed as part of this software.
|
|
|
#-----------------------------------------------------------------------------
|
|
|
|
|
|
import os
|
|
|
import threading
|
|
|
import time
|
|
|
import uuid
|
|
|
from io import StringIO, UnsupportedOperation
|
|
|
|
|
|
import zmq
|
|
|
|
|
|
from .session import extract_header
|
|
|
|
|
|
from IPython.utils import py3compat
|
|
|
|
|
|
#-----------------------------------------------------------------------------
|
|
|
# Globals
|
|
|
#-----------------------------------------------------------------------------
|
|
|
|
|
|
MASTER = 0
|
|
|
CHILD = 1
|
|
|
|
|
|
#-----------------------------------------------------------------------------
|
|
|
# Stream classes
|
|
|
#-----------------------------------------------------------------------------
|
|
|
|
|
|
class OutStream(object):
|
|
|
"""A file like object that publishes the stream to a 0MQ PUB socket."""
|
|
|
|
|
|
# The time interval between automatic flushes, in seconds.
|
|
|
_subprocess_flush_limit = 256
|
|
|
flush_interval = 0.05
|
|
|
topic=None
|
|
|
|
|
|
def __init__(self, session, pub_socket, name, pipe=True):
|
|
|
self.encoding = 'UTF-8'
|
|
|
self.session = session
|
|
|
self.pub_socket = pub_socket
|
|
|
self.name = name
|
|
|
self.topic = b'stream.' + py3compat.cast_bytes(name)
|
|
|
self.parent_header = {}
|
|
|
self._new_buffer()
|
|
|
self._buffer_lock = threading.Lock()
|
|
|
self._master_pid = os.getpid()
|
|
|
self._master_thread = threading.current_thread().ident
|
|
|
self._pipe_pid = os.getpid()
|
|
|
self._pipe_flag = pipe
|
|
|
if pipe:
|
|
|
self._setup_pipe_in()
|
|
|
|
|
|
def _setup_pipe_in(self):
|
|
|
"""setup listening pipe for subprocesses"""
|
|
|
ctx = self.pub_socket.context
|
|
|
|
|
|
# use UUID to authenticate pipe messages
|
|
|
self._pipe_uuid = uuid.uuid4().bytes
|
|
|
|
|
|
self._pipe_in = ctx.socket(zmq.PULL)
|
|
|
self._pipe_in.linger = 0
|
|
|
self._pipe_port = self._pipe_in.bind_to_random_port("tcp://127.0.0.1")
|
|
|
self._pipe_poller = zmq.Poller()
|
|
|
self._pipe_poller.register(self._pipe_in, zmq.POLLIN)
|
|
|
|
|
|
def _setup_pipe_out(self):
|
|
|
# must be new context after fork
|
|
|
ctx = zmq.Context()
|
|
|
self._pipe_pid = os.getpid()
|
|
|
self._pipe_out = ctx.socket(zmq.PUSH)
|
|
|
self._pipe_out_lock = threading.Lock()
|
|
|
self._pipe_out.connect("tcp://127.0.0.1:%i" % self._pipe_port)
|
|
|
|
|
|
def _is_master_process(self):
|
|
|
return os.getpid() == self._master_pid
|
|
|
|
|
|
def _is_master_thread(self):
|
|
|
return threading.current_thread().ident == self._master_thread
|
|
|
|
|
|
def _have_pipe_out(self):
|
|
|
return os.getpid() == self._pipe_pid
|
|
|
|
|
|
def _check_mp_mode(self):
|
|
|
"""check for forks, and switch to zmq pipeline if necessary"""
|
|
|
if not self._pipe_flag or self._is_master_process():
|
|
|
return MASTER
|
|
|
else:
|
|
|
if not self._have_pipe_out():
|
|
|
self._flush_buffer()
|
|
|
# setup a new out pipe
|
|
|
self._setup_pipe_out()
|
|
|
return CHILD
|
|
|
|
|
|
def set_parent(self, parent):
|
|
|
self.parent_header = extract_header(parent)
|
|
|
|
|
|
def close(self):
|
|
|
self.pub_socket = None
|
|
|
|
|
|
def _flush_from_subprocesses(self):
|
|
|
"""flush possible pub data from subprocesses into my buffer"""
|
|
|
if not self._pipe_flag or not self._is_master_process():
|
|
|
return
|
|
|
for i in range(self._subprocess_flush_limit):
|
|
|
if self._pipe_poller.poll(0):
|
|
|
msg = self._pipe_in.recv_multipart()
|
|
|
if msg[0] != self._pipe_uuid:
|
|
|
continue
|
|
|
else:
|
|
|
self._buffer.write(msg[1].decode(self.encoding, 'replace'))
|
|
|
# this always means a flush,
|
|
|
# so reset our timer
|
|
|
self._start = 0
|
|
|
else:
|
|
|
break
|
|
|
|
|
|
def flush(self):
|
|
|
"""trigger actual zmq send"""
|
|
|
if self.pub_socket is None:
|
|
|
raise ValueError(u'I/O operation on closed file')
|
|
|
|
|
|
mp_mode = self._check_mp_mode()
|
|
|
|
|
|
if mp_mode != CHILD:
|
|
|
# we are master
|
|
|
if not self._is_master_thread():
|
|
|
# sub-threads must not trigger flush,
|
|
|
# but at least they can force the timer.
|
|
|
self._start = 0
|
|
|
return
|
|
|
|
|
|
self._flush_from_subprocesses()
|
|
|
data = self._flush_buffer()
|
|
|
|
|
|
if data:
|
|
|
content = {u'name':self.name, u'data':data}
|
|
|
msg = self.session.send(self.pub_socket, u'stream', content=content,
|
|
|
parent=self.parent_header, ident=self.topic)
|
|
|
|
|
|
if hasattr(self.pub_socket, 'flush'):
|
|
|
# socket itself has flush (presumably ZMQStream)
|
|
|
self.pub_socket.flush()
|
|
|
else:
|
|
|
with self._pipe_out_lock:
|
|
|
string = self._flush_buffer()
|
|
|
tracker = self._pipe_out.send_multipart([
|
|
|
self._pipe_uuid,
|
|
|
string.encode(self.encoding, 'replace'),
|
|
|
], copy=False, track=True)
|
|
|
try:
|
|
|
tracker.wait(1)
|
|
|
except:
|
|
|
pass
|
|
|
|
|
|
def isatty(self):
|
|
|
return False
|
|
|
|
|
|
def __next__(self):
|
|
|
raise IOError('Read not supported on a write only stream.')
|
|
|
|
|
|
if not py3compat.PY3:
|
|
|
next = __next__
|
|
|
|
|
|
def read(self, size=-1):
|
|
|
raise IOError('Read not supported on a write only stream.')
|
|
|
|
|
|
def readline(self, size=-1):
|
|
|
raise IOError('Read not supported on a write only stream.')
|
|
|
|
|
|
def fileno(self):
|
|
|
raise UnsupportedOperation("IOStream has no fileno.")
|
|
|
|
|
|
def write(self, string):
|
|
|
if self.pub_socket is None:
|
|
|
raise ValueError('I/O operation on closed file')
|
|
|
else:
|
|
|
# Make sure that we're handling unicode
|
|
|
if not isinstance(string, unicode):
|
|
|
string = string.decode(self.encoding, 'replace')
|
|
|
|
|
|
is_child = (self._check_mp_mode() == CHILD)
|
|
|
self._buffer.write(string)
|
|
|
if is_child:
|
|
|
# newlines imply flush in subprocesses
|
|
|
# mp.Pool cannot be trusted to flush promptly (or ever),
|
|
|
# and this helps.
|
|
|
if '\n' in string:
|
|
|
self.flush()
|
|
|
# do we want to check subprocess flushes on write?
|
|
|
# self._flush_from_subprocesses()
|
|
|
current_time = time.time()
|
|
|
if self._start < 0:
|
|
|
self._start = current_time
|
|
|
elif current_time - self._start > self.flush_interval:
|
|
|
self.flush()
|
|
|
|
|
|
def writelines(self, sequence):
|
|
|
if self.pub_socket is None:
|
|
|
raise ValueError('I/O operation on closed file')
|
|
|
else:
|
|
|
for string in sequence:
|
|
|
self.write(string)
|
|
|
|
|
|
def _flush_buffer(self):
|
|
|
"""clear the current buffer and return the current buffer data"""
|
|
|
data = u''
|
|
|
if self._buffer is not None:
|
|
|
data = self._buffer.getvalue()
|
|
|
self._buffer.close()
|
|
|
self._new_buffer()
|
|
|
return data
|
|
|
|
|
|
def _new_buffer(self):
|
|
|
self._buffer = StringIO()
|
|
|
self._start = -1
|
|
|
|