iostream.py
239 lines
| 8.1 KiB
| text/x-python
|
PythonLexer
|
r2754 | import sys | ||
import time | ||||
|
r9436 | import os | ||
|
r9440 | import threading | ||
import uuid | ||||
|
r4070 | from io import StringIO | ||
|
r2754 | |||
|
r9440 | import zmq | ||
|
r2754 | from session import extract_header, Message | ||
|
r8595 | from IPython.utils import io, text | ||
|
r7847 | from IPython.utils import py3compat | ||
|
r2938 | |||
|
r9436 | import multiprocessing as mp | ||
|
r9440 | # import multiprocessing.sharedctypes as mpshc | ||
|
r9436 | from ctypes import c_bool | ||
|
r2754 | #----------------------------------------------------------------------------- | ||
|
r3322 | # Globals | ||
|
r2754 | #----------------------------------------------------------------------------- | ||
|
r3322 | |||
|
r9440 | MASTER_NO_CHILDREN = 0 | ||
MASTER_WITH_CHILDREN = 1 | ||||
CHILD = 2 | ||||
|
r3322 | #----------------------------------------------------------------------------- | ||
|
r2754 | # Stream classes | ||
#----------------------------------------------------------------------------- | ||||
class OutStream(object): | ||||
"""A file like object that publishes the stream to a 0MQ PUB socket.""" | ||||
# The time interval between automatic flushes, in seconds. | ||||
flush_interval = 0.05 | ||||
|
r3602 | topic=None | ||
|
r2754 | def __init__(self, session, pub_socket, name): | ||
|
r8595 | self.encoding = 'UTF-8' | ||
|
r2754 | self.session = session | ||
self.pub_socket = pub_socket | ||||
self.name = name | ||||
self.parent_header = {} | ||||
self._new_buffer() | ||||
|
r9440 | self._found_newprocess = threading.Event() | ||
self._buffer_lock = threading.Lock() | ||||
self._master_pid = os.getpid() | ||||
self._master_thread = threading.current_thread().ident | ||||
self._pipe_pid = os.getpid() | ||||
self._setup_pipe_in() | ||||
def _setup_pipe_in(self): | ||||
"""setup listening pipe for subprocesses""" | ||||
ctx = self._pipe_ctx = zmq.Context() | ||||
# signal pair for terminating background thread | ||||
self._pipe_signaler = ctx.socket(zmq.PAIR) | ||||
self._pipe_signalee = ctx.socket(zmq.PAIR) | ||||
self._pipe_signaler.bind("inproc://ostream_pipe") | ||||
self._pipe_signalee.connect("inproc://ostream_pipe") | ||||
# thread event to signal cleanup is done | ||||
self._pipe_done = threading.Event() | ||||
# use UUID to authenticate pipe messages | ||||
self._pipe_uuid = uuid.uuid4().bytes | ||||
self._pipe_thread = threading.Thread(target=self._pipe_main) | ||||
self._pipe_thread.start() | ||||
def _setup_pipe_out(self): | ||||
# must be new context after fork | ||||
ctx = zmq.Context() | ||||
self._pipe_pid = os.getpid() | ||||
self._pipe_out = ctx.socket(zmq.PUSH) | ||||
self._pipe_out_lock = threading.Lock() | ||||
self._pipe_out.connect("tcp://127.0.0.1:%i" % self._pipe_port) | ||||
def _pipe_main(self): | ||||
"""eventloop for receiving""" | ||||
ctx = self._pipe_ctx | ||||
self._pipe_in = ctx.socket(zmq.PULL) | ||||
self._pipe_port = self._pipe_in.bind_to_random_port("tcp://127.0.0.1") | ||||
poller = zmq.Poller() | ||||
poller.register(self._pipe_signalee, zmq.POLLIN) | ||||
poller.register(self._pipe_in, zmq.POLLIN) | ||||
while True: | ||||
if not self._is_master_process(): | ||||
return | ||||
try: | ||||
events = dict(poller.poll(1000)) | ||||
except zmq.ZMQError: | ||||
# should only be triggered by process-ending cleanup | ||||
return | ||||
if self._pipe_signalee in events: | ||||
break | ||||
if self._pipe_in in events: | ||||
msg = self._pipe_in.recv_multipart() | ||||
if msg[0] != self._pipe_uuid: | ||||
# message not authenticated | ||||
continue | ||||
self._found_newprocess.set() | ||||
text = msg[1].decode(self.encoding, 'replace') | ||||
with self._buffer_lock: | ||||
self._buffer.write(text) | ||||
if self._start < 0: | ||||
self._start = time.time() | ||||
# wrap it up | ||||
self._pipe_signaler.close() | ||||
self._pipe_signalee.close() | ||||
self._pipe_in.close() | ||||
self._pipe_ctx.term() | ||||
self._pipe_done.set() | ||||
def __del__(self): | ||||
if not self._is_master_process(): | ||||
return | ||||
self._pipe_signaler.send(b'die') | ||||
self._pipe_done.wait(10) | ||||
|
r9436 | def _is_master_process(self): | ||
|
r9440 | return os.getpid() == self._master_pid | ||
def _is_master_thread(self): | ||||
return threading.current_thread().ident == self._master_thread | ||||
def _have_pipe_out(self): | ||||
return os.getpid() == self._pipe_pid | ||||
|
r9436 | |||
def _check_mp_mode(self): | ||||
|
r9440 | """check for forks, and switch to zmq pipeline if necessary""" | ||
if self._is_master_process(): | ||||
if self._found_newprocess.is_set(): | ||||
return MASTER_WITH_CHILDREN | ||||
else: | ||||
return MASTER_NO_CHILDREN | ||||
else: | ||||
if not self._have_pipe_out(): | ||||
# setup a new out pipe | ||||
self._setup_pipe_out() | ||||
return CHILD | ||||
|
r2754 | |||
def set_parent(self, parent): | ||||
self.parent_header = extract_header(parent) | ||||
def close(self): | ||||
self.pub_socket = None | ||||
def flush(self): | ||||
|
r9440 | """trigger actual zmq send""" | ||
|
r2754 | if self.pub_socket is None: | ||
raise ValueError(u'I/O operation on closed file') | ||||
else: | ||||
|
r9436 | if self._is_master_process(): | ||
|
r9440 | if not self._is_master_thread(): | ||
# sub-threads mustn't trigger flush, | ||||
# but at least they can force the timer. | ||||
self._start = 0 | ||||
|
r9436 | data = u'' | ||
|
r9440 | # obtain data | ||
if self._check_mp_mode(): # multiprocess, needs a lock | ||||
with self._buffer_lock: | ||||
data = self._buffer.getvalue() | ||||
self._buffer.close() | ||||
self._new_buffer() | ||||
else: # single process mode | ||||
|
r9436 | data = self._buffer.getvalue() | ||
|
r9440 | self._buffer.close() | ||
self._new_buffer() | ||||
|
r9436 | |||
if data: | ||||
content = {u'name':self.name, u'data':data} | ||||
msg = self.session.send(self.pub_socket, u'stream', content=content, | ||||
parent=self.parent_header, ident=self.topic) | ||||
if hasattr(self.pub_socket, 'flush'): | ||||
# socket itself has flush (presumably ZMQStream) | ||||
self.pub_socket.flush() | ||||
else: | ||||
|
r9440 | self._check_mp_mode() | ||
with self._pipe_out_lock: | ||||
tracker = self._pipe_out.send(b'', copy=False, track=True) | ||||
tracker.wait(1) | ||||
|
r9436 | |||
|
r2754 | |||
def isatty(self): | ||||
return False | ||||
|
r7847 | def __next__(self): | ||
|
r2754 | raise IOError('Read not supported on a write only stream.') | ||
|
r7847 | if not py3compat.PY3: | ||
next = __next__ | ||||
|
r2754 | def read(self, size=-1): | ||
raise IOError('Read not supported on a write only stream.') | ||||
def readline(self, size=-1): | ||||
raise IOError('Read not supported on a write only stream.') | ||||
def write(self, string): | ||||
if self.pub_socket is None: | ||||
raise ValueError('I/O operation on closed file') | ||||
else: | ||||
|
r4070 | # Make sure that we're handling unicode | ||
if not isinstance(string, unicode): | ||||
|
r8595 | string = string.decode(self.encoding, 'replace') | ||
|
r9440 | |||
mp_mode = self._check_mp_mode() | ||||
if mp_mode == CHILD: | ||||
with self._pipe_out_lock: | ||||
self._pipe_out.send_multipart([ | ||||
self._pipe_uuid, | ||||
string.encode(self.encoding, 'replace'), | ||||
]) | ||||
return | ||||
elif mp_mode == MASTER_NO_CHILDREN: | ||||
|
r9436 | self._buffer.write(string) | ||
|
r9440 | elif mp_mode == MASTER_WITH_CHILDREN: | ||
with self._buffer_lock: | ||||
self._buffer.write(string) | ||||
|
r9436 | |||
|
r2754 | current_time = time.time() | ||
|
r9440 | if self._start < 0: | ||
|
r2754 | self._start = current_time | ||
elif current_time - self._start > self.flush_interval: | ||||
self.flush() | ||||
def writelines(self, sequence): | ||||
if self.pub_socket is None: | ||||
raise ValueError('I/O operation on closed file') | ||||
else: | ||||
for string in sequence: | ||||
self.write(string) | ||||
def _new_buffer(self): | ||||
self._buffer = StringIO() | ||||
|
r2938 | self._start = -1 | ||