##// END OF EJS Templates
Example of adding input transformers....
Example of adding input transformers. Closes gh-4709

File last commit:

r13595:41d387ee
r13887:19207476
Show More
util.py
369 lines | 11.3 KiB | text/x-python | PythonLexer
MinRK
update recently changed modules with Authors in docstring
r4018 """some generic utilities for dealing with classes, urls, and serialization
Authors:
* Min RK
"""
MinRK
copyright statements
r3660 #-----------------------------------------------------------------------------
# Copyright (C) 2010-2011 The IPython Development Team
#
# Distributed under the terms of the BSD License. The full license is in
# the file COPYING, distributed as part of this software.
#-----------------------------------------------------------------------------
#-----------------------------------------------------------------------------
# Imports
#-----------------------------------------------------------------------------
MinRK
organize IPython.parallel into subpackages
r3673 # Standard library imports.
import logging
import os
MinRK
improved logging + Hub,Engine,Scheduler are Configurable
r3603 import re
MinRK
organize IPython.parallel into subpackages
r3673 import stat
MinRK
persist connection data to disk as json
r3614 import socket
MinRK
organize IPython.parallel into subpackages
r3673 import sys
from signal import signal, SIGINT, SIGABRT, SIGTERM
try:
from signal import SIGKILL
except ImportError:
SIGKILL=None
MinRK
improved client.get_results() behavior
r3598
MinRK
cleanup pass
r3644 try:
import cPickle
pickle = cPickle
except:
cPickle = None
import pickle
MinRK
organize IPython.parallel into subpackages
r3673 # System library imports
import zmq
from zmq.log import handlers
MinRK
cleanup pass
r3644
MinRK
add log_errors decorator for on_recv callbacks...
r6324 from IPython.external.decorator import decorator
MinRK
organize IPython.parallel into subpackages
r3673 # IPython imports
MinRK
IPython.parallel logging cleanup...
r4506 from IPython.config.application import Application
MinRK
avoid executing code in utils.localinterfaces at import time...
r12591 from IPython.utils.localinterfaces import localhost, is_public_ip, public_ips
Thomas Kluyver
Fix references to dict.iteritems and dict.itervalues
r13361 from IPython.utils.py3compat import string_types, iteritems, itervalues
MinRK
mv IPython.zmq to IPython.kernel.zmq
r9372 from IPython.kernel.zmq.log import EnginePUBHandler
from IPython.kernel.zmq.serialize import (
MinRK
move apply serialization into zmq.serialize
r6788 unserialize_object, serialize_object, pack_apply_message, unpack_apply_message
)
MinRK
cleanup pass
r3644
MinRK
copyright statements
r3660 #-----------------------------------------------------------------------------
# Classes
#-----------------------------------------------------------------------------
MinRK
reflect revised apply_bound pattern
r3655 class Namespace(dict):
"""Subclass of dict for attribute access to keys."""
def __getattr__(self, key):
"""getattr aliased to getitem"""
Thomas Kluyver
Remove uses of iterkeys
r13360 if key in self:
MinRK
reflect revised apply_bound pattern
r3655 return self[key]
else:
raise NameError(key)
def __setattr__(self, key, value):
"""setattr aliased to setitem, with strict"""
if hasattr(dict, key):
raise KeyError("Cannot override dict keys %r"%key)
self[key] = value
MinRK
improved client.get_results() behavior
r3598 class ReverseDict(dict):
"""simple double-keyed subset of dict methods."""
def __init__(self, *args, **kwargs):
dict.__init__(self, *args, **kwargs)
self._reverse = dict()
Thomas Kluyver
Fix references to dict.iteritems and dict.itervalues
r13361 for key, value in iteritems(self):
MinRK
improved client.get_results() behavior
r3598 self._reverse[value] = key
def __getitem__(self, key):
try:
return dict.__getitem__(self, key)
except KeyError:
return self._reverse[key]
def __setitem__(self, key, value):
if key in self._reverse:
raise KeyError("Can't have key %r on both sides!"%key)
dict.__setitem__(self, key, value)
self._reverse[value] = key
def pop(self, key):
value = dict.pop(self, key)
MinRK
tasks on engines when they die fail instead of hang...
r3612 self._reverse.pop(value)
MinRK
improved client.get_results() behavior
r3598 return value
def get(self, key, default=None):
try:
return self[key]
except KeyError:
return default
MinRK
copyright statements
r3660 #-----------------------------------------------------------------------------
# Functions
#-----------------------------------------------------------------------------
MinRK
add log_errors decorator for on_recv callbacks...
r6324 @decorator
def log_errors(f, self, *args, **kwargs):
"""decorator to log unhandled exceptions raised in a method.
For use wrapping on_recv callbacks, so that exceptions
do not cause the stream to be closed.
"""
try:
return f(self, *args, **kwargs)
except Exception:
self.log.error("Uncaught exception in %r" % f, exc_info=True)
Hannes Schulz
introduce is_url() replacing validate_url() in cases where it is used in a control structure (as opposed to an assert)
r5078 def is_url(url):
"""boolean check for whether a string is a zmq url"""
if '://' not in url:
return False
proto, addr = url.split('://', 1)
if proto.lower() not in ['tcp','pgm','epgm','ipc','inproc']:
return False
return True
MinRK
improved logging + Hub,Engine,Scheduler are Configurable
r3603 def validate_url(url):
"""validate a url for zeromq"""
Thomas Kluyver
Replace references to unicode and basestring
r13353 if not isinstance(url, string_types):
MinRK
improved logging + Hub,Engine,Scheduler are Configurable
r3603 raise TypeError("url must be a string, not %r"%type(url))
url = url.lower()
proto_addr = url.split('://')
assert len(proto_addr) == 2, 'Invalid url: %r'%url
proto, addr = proto_addr
assert proto in ['tcp','pgm','epgm','ipc','inproc'], "Invalid protocol: %r"%proto
# domain pattern adapted from http://www.regexlib.com/REDetails.aspx?regexp_id=391
# author: Remi Sabourin
pat = re.compile(r'^([\w\d]([\w\d\-]{0,61}[\w\d])?\.)*[\w\d]([\w\d\-]{0,61}[\w\d])?$')
if proto == 'tcp':
lis = addr.split(':')
assert len(lis) == 2, 'Invalid url: %r'%url
addr,s_port = lis
try:
port = int(s_port)
except ValueError:
raise AssertionError("Invalid port %r in url: %r"%(port, url))
MinRK
API update involving map and load-balancing
r3635 assert addr == '*' or pat.match(addr) is not None, 'Invalid url: %r'%url
MinRK
improved logging + Hub,Engine,Scheduler are Configurable
r3603
else:
# only validate tcp urls currently
pass
return True
def validate_url_container(container):
"""validate a potentially nested collection of urls."""
Thomas Kluyver
Replace references to unicode and basestring
r13353 if isinstance(container, string_types):
MinRK
improved logging + Hub,Engine,Scheduler are Configurable
r3603 url = container
return validate_url(url)
elif isinstance(container, dict):
Thomas Kluyver
Fix references to dict.iteritems and dict.itervalues
r13361 container = itervalues(container)
MinRK
improved logging + Hub,Engine,Scheduler are Configurable
r3603
for element in container:
MinRK
persist connection data to disk as json
r3614 validate_url_container(element)
def split_url(url):
"""split a zmq url (tcp://ip:port) into ('tcp','ip','port')."""
proto_addr = url.split('://')
assert len(proto_addr) == 2, 'Invalid url: %r'%url
proto, addr = proto_addr
lis = addr.split(':')
assert len(lis) == 2, 'Invalid url: %r'%url
addr,s_port = lis
return proto,addr,s_port
def disambiguate_ip_address(ip, location=None):
"""turn multi-ip interfaces '0.0.0.0' and '*' into connectable
ones, based on the location (default interpretation of location is localhost)."""
if ip in ('0.0.0.0', '*'):
MinRK
avoid executing code in utils.localinterfaces at import time...
r12591 if location is None or is_public_ip(location) or not public_ips():
MinRK
don't allow gethostbyname(gethostname()) failure to crash ipcontroller...
r4239 # If location is unspecified or cannot be determined, assume local
MinRK
avoid executing code in utils.localinterfaces at import time...
r12591 ip = localhost()
MinRK
newparallel tweaks, fixes...
r3622 elif location:
return location
MinRK
persist connection data to disk as json
r3614 return ip
def disambiguate_url(url, location=None):
"""turn multi-ip interfaces '0.0.0.0' and '*' into connectable
ones, based on the location (default interpretation is localhost).
Thomas Kluyver
Various docs fixes
r13595 This is for zeromq urls, such as ``tcp://*:10101``.
"""
MinRK
persist connection data to disk as json
r3614 try:
proto,ip,port = split_url(url)
except AssertionError:
# probably not tcp url; could be ipc, etc.
return url
ip = disambiguate_ip_address(ip,location)
return "%s://%s:%s"%(proto,ip,port)
MinRK
cleanup pass
r3644
MinRK
update API after sagedays29...
r3664 #--------------------------------------------------------------------------
# helpers for implementing old MEC API via view.apply
#--------------------------------------------------------------------------
def interactive(f):
"""decorator for making functions appear as interactively defined.
This results in the function being linked to the user_ns as globals()
instead of the module globals().
"""
f.__module__ = '__main__'
return f
@interactive
MinRK
enable non-copying sends in push...
r6243 def _push(**ns):
MinRK
update API after sagedays29...
r3664 """helper method for implementing `client.push` via `client.apply`"""
Bradley M. Froehle
Parallel: Support get/set of nested objects in view (e.g. dv['a.b'])
r8271 user_ns = globals()
tmp = '_IP_PUSH_TMP_'
while tmp in user_ns:
tmp = tmp + '_'
try:
Thomas Kluyver
Fix parallel test suite
r13383 for name, value in ns.items():
Bradley M. Froehle
Parallel: Support get/set of nested objects in view (e.g. dv['a.b'])
r8271 user_ns[tmp] = value
Thomas Kluyver
Fix exec statements for Py 3...
r13350 exec("%s = %s" % (name, tmp), user_ns)
Bradley M. Froehle
Parallel: Support get/set of nested objects in view (e.g. dv['a.b'])
r8271 finally:
user_ns.pop(tmp, None)
MinRK
update API after sagedays29...
r3664
@interactive
def _pull(keys):
"""helper method for implementing `client.pull` via `client.apply`"""
if isinstance(keys, (list,tuple, set)):
Thomas Kluyver
Fix parallel test suite
r13383 return [eval(key, globals()) for key in keys]
MinRK
update API after sagedays29...
r3664 else:
Bradley M. Froehle
Parallel: Support get/set of nested objects in view (e.g. dv['a.b'])
r8271 return eval(keys, globals())
MinRK
update API after sagedays29...
r3664
@interactive
def _execute(code):
"""helper method for implementing `client.execute` via `client.apply`"""
Thomas Kluyver
Fix exec statements for Py 3...
r13350 exec(code, globals())
MinRK
update API after sagedays29...
r3664
MinRK
organize IPython.parallel into subpackages
r3673 #--------------------------------------------------------------------------
# extra process management utilities
#--------------------------------------------------------------------------
_random_ports = set()
def select_random_ports(n):
"""Selects and return n random ports that are available."""
ports = []
Thomas Kluyver
Fix references to xrange
r13356 for i in range(n):
MinRK
organize IPython.parallel into subpackages
r3673 sock = socket.socket()
sock.bind(('', 0))
while sock.getsockname()[1] in _random_ports:
sock.close()
sock = socket.socket()
sock.bind(('', 0))
ports.append(sock)
for i, sock in enumerate(ports):
port = sock.getsockname()[1]
sock.close()
ports[i] = port
_random_ports.add(port)
return ports
def signal_children(children):
"""Relay interupt/term signals to children, for more solid process cleanup."""
def terminate_children(sig, frame):
MinRK
IPython.parallel logging cleanup...
r4506 log = Application.instance().log
log.critical("Got signal %i, terminating children..."%sig)
MinRK
organize IPython.parallel into subpackages
r3673 for child in children:
child.terminate()
sys.exit(sig != SIGINT)
# sys.exit(sig)
for sig in (SIGINT, SIGABRT, SIGTERM):
signal(sig, terminate_children)
def generate_exec_key(keyfile):
import uuid
newkey = str(uuid.uuid4())
with open(keyfile, 'w') as f:
# f.write('ipython-key ')
f.write(newkey+'\n')
# set user-only RW permissions (0600)
# this will have no effect on Windows
os.chmod(keyfile, stat.S_IRUSR|stat.S_IWUSR)
def integer_loglevel(loglevel):
try:
loglevel = int(loglevel)
except ValueError:
if isinstance(loglevel, str):
loglevel = getattr(logging, loglevel)
return loglevel
def connect_logger(logname, context, iface, root="ip", loglevel=logging.DEBUG):
logger = logging.getLogger(logname)
if any([isinstance(h, handlers.PUBHandler) for h in logger.handlers]):
# don't add a second PUBHandler
return
loglevel = integer_loglevel(loglevel)
lsock = context.socket(zmq.PUB)
lsock.connect(iface)
handler = handlers.PUBHandler(lsock)
handler.setLevel(loglevel)
handler.root_topic = root
logger.addHandler(handler)
logger.setLevel(loglevel)
def connect_engine_logger(context, iface, engine, loglevel=logging.DEBUG):
logger = logging.getLogger()
if any([isinstance(h, handlers.PUBHandler) for h in logger.handlers]):
# don't add a second PUBHandler
return
loglevel = integer_loglevel(loglevel)
lsock = context.socket(zmq.PUB)
lsock.connect(iface)
handler = EnginePUBHandler(engine, lsock)
handler.setLevel(loglevel)
logger.addHandler(handler)
logger.setLevel(loglevel)
MinRK
reorganize Factory classes to follow relocation of Session object
r4007 return logger
MinRK
organize IPython.parallel into subpackages
r3673
def local_logger(logname, loglevel=logging.DEBUG):
loglevel = integer_loglevel(loglevel)
logger = logging.getLogger(logname)
if any([isinstance(h, logging.StreamHandler) for h in logger.handlers]):
# don't add a second StreamHandler
return
handler = logging.StreamHandler()
handler.setLevel(loglevel)
MinRK
match log format in Scheduler to rest of parallel apps
r5888 formatter = logging.Formatter("%(asctime)s.%(msecs).03d [%(name)s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S")
handler.setFormatter(formatter)
MinRK
organize IPython.parallel into subpackages
r3673 logger.addHandler(handler)
logger.setLevel(loglevel)
MinRK
reorganize Factory classes to follow relocation of Session object
r4007 return logger
MinRK
General improvements to database backend...
r3780
MinRK
set unlimited HWM for all relay devices...
r10614 def set_hwm(sock, hwm=0):
"""set zmq High Water Mark on a socket
in a way that always works for various pyzmq / libzmq versions.
"""
import zmq
for key in ('HWM', 'SNDHWM', 'RCVHWM'):
opt = getattr(zmq, key, None)
if opt is None:
continue
try:
sock.setsockopt(opt, hwm)
except zmq.ZMQError:
pass