##// END OF EJS Templates
payload.write_payload: use `single` keyword instead of `update`...
payload.write_payload: use `single` keyword instead of `update` * change default behavior when adding payloads to single=True * document write_payload

File last commit:

r11697:ee929cdc
r12933:b8499e39
Show More
iostream.py
220 lines | 7.4 KiB | text/x-python | PythonLexer
MinRK
cleanup unused symbols in iostream...
r9444 """wrappers for stdout/stderr forwarding over zmq
"""
#-----------------------------------------------------------------------------
# Copyright (C) 2013 The IPython Development Team
#
# Distributed under the terms of the BSD License. The full license is in
# the file COPYING, distributed as part of this software.
#-----------------------------------------------------------------------------
Piti Ongmongkolkul
Dual buffer with automatic switch. No performance hit for single process.
r9436 import os
MinRK
use zmq to protect subprocess stdout after fork
r9440 import threading
MinRK
raise UnsupportedOperation on iostream.fileno()...
r10193 import time
MinRK
use zmq to protect subprocess stdout after fork
r9440 import uuid
MinRK
raise UnsupportedOperation on iostream.fileno()...
r10193 from io import StringIO, UnsupportedOperation
Brian Granger
Separating kernel into smaller pieces.
r2754
MinRK
use zmq to protect subprocess stdout after fork
r9440 import zmq
MinRK
cleanup unused symbols in iostream...
r9444 from session import extract_header
Brian Granger
Separating kernel into smaller pieces.
r2754
Bradley M. Froehle
Apply 2to3 `next` fix....
r7847 from IPython.utils import py3compat
Fernando Perez
Add missing flush of output streams on execute
r2938
Brian Granger
Separating kernel into smaller pieces.
r2754 #-----------------------------------------------------------------------------
Fernando Perez
Merge branch 'kernel-logging' of https://github.com/omazapa/ipython into omazapa-kernel-logging...
r3322 # Globals
Brian Granger
Separating kernel into smaller pieces.
r2754 #-----------------------------------------------------------------------------
Fernando Perez
Merge branch 'kernel-logging' of https://github.com/omazapa/ipython into omazapa-kernel-logging...
r3322
MinRK
cleanup unused symbols in iostream...
r9444 MASTER = 0
CHILD = 1
MinRK
use zmq to protect subprocess stdout after fork
r9440
Fernando Perez
Merge branch 'kernel-logging' of https://github.com/omazapa/ipython into omazapa-kernel-logging...
r3322 #-----------------------------------------------------------------------------
Brian Granger
Separating kernel into smaller pieces.
r2754 # Stream classes
#-----------------------------------------------------------------------------
class OutStream(object):
"""A file like object that publishes the stream to a 0MQ PUB socket."""
# The time interval between automatic flushes, in seconds.
MinRK
allow adjustable subprocess flush limit
r9452 _subprocess_flush_limit = 256
Brian Granger
Separating kernel into smaller pieces.
r2754 flush_interval = 0.05
MinRK
propagate iopub to clients
r3602 topic=None
MinRK
allow disabling subprocess pipe
r9447 def __init__(self, session, pub_socket, name, pipe=True):
Bradley M. Froehle
Use 'UTF-8' as the `encoding` attribute in `OutStream` class....
r8595 self.encoding = 'UTF-8'
Brian Granger
Separating kernel into smaller pieces.
r2754 self.session = session
self.pub_socket = pub_socket
self.name = name
MinRK
set some topics on IOPub messages...
r11697 self.topic = b'stream.' + py3compat.cast_bytes(name)
Brian Granger
Separating kernel into smaller pieces.
r2754 self.parent_header = {}
self._new_buffer()
MinRK
use zmq to protect subprocess stdout after fork
r9440 self._buffer_lock = threading.Lock()
self._master_pid = os.getpid()
self._master_thread = threading.current_thread().ident
self._pipe_pid = os.getpid()
MinRK
allow disabling subprocess pipe
r9447 self._pipe_flag = pipe
if pipe:
self._setup_pipe_in()
MinRK
use zmq to protect subprocess stdout after fork
r9440
def _setup_pipe_in(self):
"""setup listening pipe for subprocesses"""
MinRK
subprocess iostreams only send on flush
r9443 ctx = self.pub_socket.context
MinRK
use zmq to protect subprocess stdout after fork
r9440
# use UUID to authenticate pipe messages
self._pipe_uuid = uuid.uuid4().bytes
MinRK
subprocess iostreams only send on flush
r9443 self._pipe_in = ctx.socket(zmq.PULL)
self._pipe_in.linger = 0
self._pipe_port = self._pipe_in.bind_to_random_port("tcp://127.0.0.1")
self._pipe_poller = zmq.Poller()
self._pipe_poller.register(self._pipe_in, zmq.POLLIN)
MinRK
use zmq to protect subprocess stdout after fork
r9440
def _setup_pipe_out(self):
# must be new context after fork
ctx = zmq.Context()
self._pipe_pid = os.getpid()
self._pipe_out = ctx.socket(zmq.PUSH)
self._pipe_out_lock = threading.Lock()
self._pipe_out.connect("tcp://127.0.0.1:%i" % self._pipe_port)
Piti Ongmongkolkul
Dual buffer with automatic switch. No performance hit for single process.
r9436 def _is_master_process(self):
MinRK
use zmq to protect subprocess stdout after fork
r9440 return os.getpid() == self._master_pid
def _is_master_thread(self):
return threading.current_thread().ident == self._master_thread
def _have_pipe_out(self):
return os.getpid() == self._pipe_pid
Piti Ongmongkolkul
Dual buffer with automatic switch. No performance hit for single process.
r9436
def _check_mp_mode(self):
MinRK
use zmq to protect subprocess stdout after fork
r9440 """check for forks, and switch to zmq pipeline if necessary"""
MinRK
allow disabling subprocess pipe
r9447 if not self._pipe_flag or self._is_master_process():
MinRK
cleanup unused symbols in iostream...
r9444 return MASTER
MinRK
use zmq to protect subprocess stdout after fork
r9440 else:
if not self._have_pipe_out():
MinRK
flush on newline in subprocesses...
r9451 self._flush_buffer()
MinRK
use zmq to protect subprocess stdout after fork
r9440 # setup a new out pipe
self._setup_pipe_out()
return CHILD
Brian Granger
Separating kernel into smaller pieces.
r2754
def set_parent(self, parent):
self.parent_header = extract_header(parent)
def close(self):
self.pub_socket = None
MinRK
subprocess iostreams only send on flush
r9443 def _flush_from_subprocesses(self):
"""flush possible pub data from subprocesses into my buffer"""
MinRK
allow disabling subprocess pipe
r9447 if not self._pipe_flag or not self._is_master_process():
MinRK
subprocess iostreams only send on flush
r9443 return
MinRK
allow adjustable subprocess flush limit
r9452 for i in range(self._subprocess_flush_limit):
MinRK
subprocess iostreams only send on flush
r9443 if self._pipe_poller.poll(0):
msg = self._pipe_in.recv_multipart()
if msg[0] != self._pipe_uuid:
continue
else:
self._buffer.write(msg[1].decode(self.encoding, 'replace'))
# this always means a flush,
# so reset our timer
self._start = 0
else:
break
Brian Granger
Separating kernel into smaller pieces.
r2754 def flush(self):
MinRK
use zmq to protect subprocess stdout after fork
r9440 """trigger actual zmq send"""
Brian Granger
Separating kernel into smaller pieces.
r2754 if self.pub_socket is None:
raise ValueError(u'I/O operation on closed file')
MinRK
subprocess iostreams only send on flush
r9443
mp_mode = self._check_mp_mode()
if mp_mode != CHILD:
# we are master
if not self._is_master_thread():
# sub-threads must not trigger flush,
# but at least they can force the timer.
self._start = 0
return
self._flush_from_subprocesses()
data = self._flush_buffer()
if data:
content = {u'name':self.name, u'data':data}
msg = self.session.send(self.pub_socket, u'stream', content=content,
parent=self.parent_header, ident=self.topic)
if hasattr(self.pub_socket, 'flush'):
# socket itself has flush (presumably ZMQStream)
self.pub_socket.flush()
Brian Granger
Separating kernel into smaller pieces.
r2754 else:
MinRK
subprocess iostreams only send on flush
r9443 with self._pipe_out_lock:
string = self._flush_buffer()
tracker = self._pipe_out.send_multipart([
self._pipe_uuid,
string.encode(self.encoding, 'replace'),
], copy=False, track=True)
MinRK
ignore error waiting for pipe flush message
r9453 try:
tracker.wait(1)
except:
pass
Brian Granger
Separating kernel into smaller pieces.
r2754
def isatty(self):
return False
Bradley M. Froehle
Apply 2to3 `next` fix....
r7847 def __next__(self):
Brian Granger
Separating kernel into smaller pieces.
r2754 raise IOError('Read not supported on a write only stream.')
Bradley M. Froehle
Apply 2to3 `next` fix....
r7847 if not py3compat.PY3:
next = __next__
Brian Granger
Separating kernel into smaller pieces.
r2754 def read(self, size=-1):
raise IOError('Read not supported on a write only stream.')
def readline(self, size=-1):
raise IOError('Read not supported on a write only stream.')
MinRK
raise UnsupportedOperation on iostream.fileno()...
r10193
def fileno(self):
raise UnsupportedOperation("IOStream has no fileno.")
Brian Granger
Separating kernel into smaller pieces.
r2754
def write(self, string):
if self.pub_socket is None:
raise ValueError('I/O operation on closed file')
else:
Thomas Kluyver
Handle unicode properly in IPython.zmq.iostream
r4070 # Make sure that we're handling unicode
if not isinstance(string, unicode):
Bradley M. Froehle
Use 'UTF-8' as the `encoding` attribute in `OutStream` class....
r8595 string = string.decode(self.encoding, 'replace')
MinRK
flush on newline in subprocesses...
r9451
is_child = (self._check_mp_mode() == CHILD)
MinRK
subprocess iostreams only send on flush
r9443 self._buffer.write(string)
MinRK
flush on newline in subprocesses...
r9451 if is_child:
# newlines imply flush in subprocesses
# mp.Pool cannot be trusted to flush promptly (or ever),
# and this helps.
if '\n' in string:
self.flush()
MinRK
subprocess iostreams only send on flush
r9443 # do we want to check subprocess flushes on write?
# self._flush_from_subprocesses()
Brian Granger
Separating kernel into smaller pieces.
r2754 current_time = time.time()
MinRK
use zmq to protect subprocess stdout after fork
r9440 if self._start < 0:
Brian Granger
Separating kernel into smaller pieces.
r2754 self._start = current_time
elif current_time - self._start > self.flush_interval:
self.flush()
def writelines(self, sequence):
if self.pub_socket is None:
raise ValueError('I/O operation on closed file')
else:
for string in sequence:
self.write(string)
MinRK
subprocess iostreams only send on flush
r9443 def _flush_buffer(self):
"""clear the current buffer and return the current buffer data"""
data = u''
if self._buffer is not None:
data = self._buffer.getvalue()
self._buffer.close()
self._new_buffer()
return data
Brian Granger
Separating kernel into smaller pieces.
r2754 def _new_buffer(self):
self._buffer = StringIO()
Fernando Perez
Add missing flush of output streams on execute
r2938 self._start = -1