##// END OF EJS Templates
Merge pull request #5810 from minrk/clean-comm-msg...
Merge pull request #5810 from minrk/clean-comm-msg apply json_clean to comm_msg content

File last commit:

r16360:61474647
r16642:e0c5da18 merge
Show More
iostream.py
238 lines | 7.9 KiB | text/x-python | PythonLexer
MinRK
schedule async flush...
r16360 """Wrappers for forwarding stdout/stderr over zmq"""
MinRK
cleanup unused symbols in iostream...
r9444
MinRK
schedule async flush...
r16360 # Copyright (c) IPython Development Team.
# Distributed under the terms of the Modified BSD License.
MinRK
cleanup unused symbols in iostream...
r9444
Piti Ongmongkolkul
Dual buffer with automatic switch. No performance hit for single process.
r9436 import os
MinRK
use zmq to protect subprocess stdout after fork
r9440 import threading
MinRK
raise UnsupportedOperation on iostream.fileno()...
r10193 import time
MinRK
use zmq to protect subprocess stdout after fork
r9440 import uuid
MinRK
raise UnsupportedOperation on iostream.fileno()...
r10193 from io import StringIO, UnsupportedOperation
Brian Granger
Separating kernel into smaller pieces.
r2754
MinRK
use zmq to protect subprocess stdout after fork
r9440 import zmq
MinRK
schedule async flush...
r16360 from zmq.eventloop.ioloop import IOLoop
MinRK
use zmq to protect subprocess stdout after fork
r9440
Thomas Kluyver
Use explicit relative imports...
r13347 from .session import extract_header
Brian Granger
Separating kernel into smaller pieces.
r2754
Bradley M. Froehle
Apply 2to3 `next` fix....
r7847 from IPython.utils import py3compat
Thomas Kluyver
Replace references to unicode and basestring
r13353 from IPython.utils.py3compat import unicode_type
MinRK
survive failure to bind to localhost in zmq.iostream...
r13825 from IPython.utils.warn import warn
Fernando Perez
Add missing flush of output streams on execute
r2938
Brian Granger
Separating kernel into smaller pieces.
r2754 #-----------------------------------------------------------------------------
Fernando Perez
Merge branch 'kernel-logging' of https://github.com/omazapa/ipython into omazapa-kernel-logging...
r3322 # Globals
Brian Granger
Separating kernel into smaller pieces.
r2754 #-----------------------------------------------------------------------------
Fernando Perez
Merge branch 'kernel-logging' of https://github.com/omazapa/ipython into omazapa-kernel-logging...
r3322
MinRK
cleanup unused symbols in iostream...
r9444 MASTER = 0
CHILD = 1
MinRK
use zmq to protect subprocess stdout after fork
r9440
Fernando Perez
Merge branch 'kernel-logging' of https://github.com/omazapa/ipython into omazapa-kernel-logging...
r3322 #-----------------------------------------------------------------------------
Brian Granger
Separating kernel into smaller pieces.
r2754 # Stream classes
#-----------------------------------------------------------------------------
class OutStream(object):
"""A file like object that publishes the stream to a 0MQ PUB socket."""
# The time interval between automatic flushes, in seconds.
MinRK
allow adjustable subprocess flush limit
r9452 _subprocess_flush_limit = 256
Brian Granger
Separating kernel into smaller pieces.
r2754 flush_interval = 0.05
MinRK
propagate iopub to clients
r3602 topic=None
MinRK
allow disabling subprocess pipe
r9447 def __init__(self, session, pub_socket, name, pipe=True):
Bradley M. Froehle
Use 'UTF-8' as the `encoding` attribute in `OutStream` class....
r8595 self.encoding = 'UTF-8'
Brian Granger
Separating kernel into smaller pieces.
r2754 self.session = session
self.pub_socket = pub_socket
self.name = name
MinRK
set some topics on IOPub messages...
r11697 self.topic = b'stream.' + py3compat.cast_bytes(name)
Brian Granger
Separating kernel into smaller pieces.
r2754 self.parent_header = {}
self._new_buffer()
MinRK
use zmq to protect subprocess stdout after fork
r9440 self._buffer_lock = threading.Lock()
self._master_pid = os.getpid()
self._master_thread = threading.current_thread().ident
self._pipe_pid = os.getpid()
MinRK
allow disabling subprocess pipe
r9447 self._pipe_flag = pipe
if pipe:
self._setup_pipe_in()
MinRK
use zmq to protect subprocess stdout after fork
r9440
def _setup_pipe_in(self):
"""setup listening pipe for subprocesses"""
MinRK
subprocess iostreams only send on flush
r9443 ctx = self.pub_socket.context
MinRK
use zmq to protect subprocess stdout after fork
r9440
# use UUID to authenticate pipe messages
self._pipe_uuid = uuid.uuid4().bytes
MinRK
subprocess iostreams only send on flush
r9443 self._pipe_in = ctx.socket(zmq.PULL)
self._pipe_in.linger = 0
MinRK
survive failure to bind to localhost in zmq.iostream...
r13825 try:
self._pipe_port = self._pipe_in.bind_to_random_port("tcp://127.0.0.1")
except zmq.ZMQError as e:
warn("Couldn't bind IOStream to 127.0.0.1: %s" % e +
"\nsubprocess output will be unavailable."
)
self._pipe_flag = False
self._pipe_in.close()
del self._pipe_in
return
MinRK
subprocess iostreams only send on flush
r9443 self._pipe_poller = zmq.Poller()
self._pipe_poller.register(self._pipe_in, zmq.POLLIN)
MinRK
use zmq to protect subprocess stdout after fork
r9440
def _setup_pipe_out(self):
# must be new context after fork
ctx = zmq.Context()
self._pipe_pid = os.getpid()
self._pipe_out = ctx.socket(zmq.PUSH)
self._pipe_out_lock = threading.Lock()
self._pipe_out.connect("tcp://127.0.0.1:%i" % self._pipe_port)
Piti Ongmongkolkul
Dual buffer with automatic switch. No performance hit for single process.
r9436 def _is_master_process(self):
MinRK
use zmq to protect subprocess stdout after fork
r9440 return os.getpid() == self._master_pid
def _is_master_thread(self):
return threading.current_thread().ident == self._master_thread
def _have_pipe_out(self):
return os.getpid() == self._pipe_pid
Piti Ongmongkolkul
Dual buffer with automatic switch. No performance hit for single process.
r9436
def _check_mp_mode(self):
MinRK
use zmq to protect subprocess stdout after fork
r9440 """check for forks, and switch to zmq pipeline if necessary"""
MinRK
allow disabling subprocess pipe
r9447 if not self._pipe_flag or self._is_master_process():
MinRK
survive failure to bind to localhost in zmq.iostream...
r13825 return MASTER
MinRK
use zmq to protect subprocess stdout after fork
r9440 else:
if not self._have_pipe_out():
MinRK
flush on newline in subprocesses...
r9451 self._flush_buffer()
MinRK
use zmq to protect subprocess stdout after fork
r9440 # setup a new out pipe
self._setup_pipe_out()
return CHILD
Brian Granger
Separating kernel into smaller pieces.
r2754
def set_parent(self, parent):
self.parent_header = extract_header(parent)
def close(self):
self.pub_socket = None
MinRK
subprocess iostreams only send on flush
r9443 def _flush_from_subprocesses(self):
"""flush possible pub data from subprocesses into my buffer"""
MinRK
allow disabling subprocess pipe
r9447 if not self._pipe_flag or not self._is_master_process():
MinRK
subprocess iostreams only send on flush
r9443 return
MinRK
allow adjustable subprocess flush limit
r9452 for i in range(self._subprocess_flush_limit):
MinRK
subprocess iostreams only send on flush
r9443 if self._pipe_poller.poll(0):
msg = self._pipe_in.recv_multipart()
if msg[0] != self._pipe_uuid:
continue
else:
self._buffer.write(msg[1].decode(self.encoding, 'replace'))
# this always means a flush,
# so reset our timer
self._start = 0
else:
break
MinRK
schedule async flush...
r16360 def _schedule_flush(self):
"""schedule a flush in the main thread
only works with a tornado/pyzmq eventloop running
"""
if IOLoop.initialized():
IOLoop.instance().add_callback(self.flush)
else:
# no async loop, at least force the timer
self._start = 0
Brian Granger
Separating kernel into smaller pieces.
r2754 def flush(self):
MinRK
use zmq to protect subprocess stdout after fork
r9440 """trigger actual zmq send"""
Brian Granger
Separating kernel into smaller pieces.
r2754 if self.pub_socket is None:
raise ValueError(u'I/O operation on closed file')
MinRK
subprocess iostreams only send on flush
r9443
mp_mode = self._check_mp_mode()
if mp_mode != CHILD:
# we are master
if not self._is_master_thread():
MinRK
schedule async flush...
r16360 # sub-threads must not trigger flush directly,
# but at least they can schedule an async flush, or force the timer.
self._schedule_flush()
MinRK
subprocess iostreams only send on flush
r9443 return
self._flush_from_subprocesses()
data = self._flush_buffer()
if data:
content = {u'name':self.name, u'data':data}
msg = self.session.send(self.pub_socket, u'stream', content=content,
parent=self.parent_header, ident=self.topic)
if hasattr(self.pub_socket, 'flush'):
# socket itself has flush (presumably ZMQStream)
self.pub_socket.flush()
Brian Granger
Separating kernel into smaller pieces.
r2754 else:
MinRK
subprocess iostreams only send on flush
r9443 with self._pipe_out_lock:
string = self._flush_buffer()
tracker = self._pipe_out.send_multipart([
self._pipe_uuid,
string.encode(self.encoding, 'replace'),
], copy=False, track=True)
MinRK
ignore error waiting for pipe flush message
r9453 try:
tracker.wait(1)
except:
pass
Brian Granger
Separating kernel into smaller pieces.
r2754
def isatty(self):
return False
Bradley M. Froehle
Apply 2to3 `next` fix....
r7847 def __next__(self):
Brian Granger
Separating kernel into smaller pieces.
r2754 raise IOError('Read not supported on a write only stream.')
Bradley M. Froehle
Apply 2to3 `next` fix....
r7847 if not py3compat.PY3:
next = __next__
Brian Granger
Separating kernel into smaller pieces.
r2754 def read(self, size=-1):
raise IOError('Read not supported on a write only stream.')
def readline(self, size=-1):
raise IOError('Read not supported on a write only stream.')
MinRK
raise UnsupportedOperation on iostream.fileno()...
r10193
def fileno(self):
raise UnsupportedOperation("IOStream has no fileno.")
Brian Granger
Separating kernel into smaller pieces.
r2754
def write(self, string):
if self.pub_socket is None:
raise ValueError('I/O operation on closed file')
else:
Thomas Kluyver
Handle unicode properly in IPython.zmq.iostream
r4070 # Make sure that we're handling unicode
Thomas Kluyver
Replace references to unicode and basestring
r13353 if not isinstance(string, unicode_type):
Bradley M. Froehle
Use 'UTF-8' as the `encoding` attribute in `OutStream` class....
r8595 string = string.decode(self.encoding, 'replace')
MinRK
flush on newline in subprocesses...
r9451
is_child = (self._check_mp_mode() == CHILD)
MinRK
subprocess iostreams only send on flush
r9443 self._buffer.write(string)
MinRK
flush on newline in subprocesses...
r9451 if is_child:
# newlines imply flush in subprocesses
# mp.Pool cannot be trusted to flush promptly (or ever),
# and this helps.
if '\n' in string:
self.flush()
MinRK
subprocess iostreams only send on flush
r9443 # do we want to check subprocess flushes on write?
# self._flush_from_subprocesses()
Brian Granger
Separating kernel into smaller pieces.
r2754 current_time = time.time()
MinRK
use zmq to protect subprocess stdout after fork
r9440 if self._start < 0:
Brian Granger
Separating kernel into smaller pieces.
r2754 self._start = current_time
elif current_time - self._start > self.flush_interval:
self.flush()
def writelines(self, sequence):
if self.pub_socket is None:
raise ValueError('I/O operation on closed file')
else:
for string in sequence:
self.write(string)
MinRK
subprocess iostreams only send on flush
r9443 def _flush_buffer(self):
"""clear the current buffer and return the current buffer data"""
data = u''
if self._buffer is not None:
data = self._buffer.getvalue()
self._buffer.close()
self._new_buffer()
return data
Brian Granger
Separating kernel into smaller pieces.
r2754 def _new_buffer(self):
self._buffer = StringIO()
Fernando Perez
Add missing flush of output streams on execute
r2938 self._start = -1