##// END OF EJS Templates
#7548: Added a rule for help lines (starting with a "?").
#7548: Added a rule for help lines (starting with a "?").

File last commit:

r17057:8818818d
r20147:5bf694c9
Show More
util.py
389 lines | 11.8 KiB | text/x-python | PythonLexer
MinRK
update header in parallel.util
r16140 """Some generic utilities for dealing with classes, urls, and serialization."""
MinRK
update recently changed modules with Authors in docstring
r4018
Thomas Kluyver
Fix non-ascii spaces in comment....
r16415 # Copyright (c) IPython Development Team.
# Distributed under the terms of the Modified BSD License.
MinRK
update recently changed modules with Authors in docstring
r4018
MinRK
organize IPython.parallel into subpackages
r3673 import logging
import os
MinRK
improved logging + Hub,Engine,Scheduler are Configurable
r3603 import re
MinRK
organize IPython.parallel into subpackages
r3673 import stat
MinRK
persist connection data to disk as json
r3614 import socket
MinRK
organize IPython.parallel into subpackages
r3673 import sys
MinRK
disambiguate to location when no IPs can be determined...
r16139 import warnings
MinRK
organize IPython.parallel into subpackages
r3673 from signal import signal, SIGINT, SIGABRT, SIGTERM
try:
from signal import SIGKILL
except ImportError:
SIGKILL=None
MinRK
make @interactive decorator friendlier with dill...
r13877 from types import FunctionType
MinRK
improved client.get_results() behavior
r3598
MinRK
cleanup pass
r3644 try:
import cPickle
pickle = cPickle
except:
cPickle = None
import pickle
MinRK
organize IPython.parallel into subpackages
r3673 import zmq
from zmq.log import handlers
MinRK
cleanup pass
r3644
MinRK
use utils.log.get_logger where appropriate
r17057 from IPython.utils.log import get_logger
MinRK
add log_errors decorator for on_recv callbacks...
r6324 from IPython.external.decorator import decorator
MinRK
IPython.parallel logging cleanup...
r4506 from IPython.config.application import Application
MinRK
avoid executing code in utils.localinterfaces at import time...
r12591 from IPython.utils.localinterfaces import localhost, is_public_ip, public_ips
Thomas Kluyver
Fix references to dict.iteritems and dict.itervalues
r13361 from IPython.utils.py3compat import string_types, iteritems, itervalues
MinRK
mv IPython.zmq to IPython.kernel.zmq
r9372 from IPython.kernel.zmq.log import EnginePUBHandler
Spencer Nelson
Remove unused imports
r16525
MinRK
cleanup pass
r3644
MinRK
copyright statements
r3660 #-----------------------------------------------------------------------------
# Classes
#-----------------------------------------------------------------------------
MinRK
reflect revised apply_bound pattern
r3655 class Namespace(dict):
"""Subclass of dict for attribute access to keys."""
def __getattr__(self, key):
"""getattr aliased to getitem"""
Thomas Kluyver
Remove uses of iterkeys
r13360 if key in self:
MinRK
reflect revised apply_bound pattern
r3655 return self[key]
else:
raise NameError(key)
def __setattr__(self, key, value):
"""setattr aliased to setitem, with strict"""
if hasattr(dict, key):
raise KeyError("Cannot override dict keys %r"%key)
self[key] = value
MinRK
improved client.get_results() behavior
r3598 class ReverseDict(dict):
"""simple double-keyed subset of dict methods."""
def __init__(self, *args, **kwargs):
dict.__init__(self, *args, **kwargs)
self._reverse = dict()
Thomas Kluyver
Fix references to dict.iteritems and dict.itervalues
r13361 for key, value in iteritems(self):
MinRK
improved client.get_results() behavior
r3598 self._reverse[value] = key
def __getitem__(self, key):
try:
return dict.__getitem__(self, key)
except KeyError:
return self._reverse[key]
def __setitem__(self, key, value):
if key in self._reverse:
raise KeyError("Can't have key %r on both sides!"%key)
dict.__setitem__(self, key, value)
self._reverse[value] = key
def pop(self, key):
value = dict.pop(self, key)
MinRK
tasks on engines when they die fail instead of hang...
r3612 self._reverse.pop(value)
MinRK
improved client.get_results() behavior
r3598 return value
def get(self, key, default=None):
try:
return self[key]
except KeyError:
return default
MinRK
copyright statements
r3660 #-----------------------------------------------------------------------------
# Functions
#-----------------------------------------------------------------------------
MinRK
add log_errors decorator for on_recv callbacks...
r6324 @decorator
def log_errors(f, self, *args, **kwargs):
"""decorator to log unhandled exceptions raised in a method.
For use wrapping on_recv callbacks, so that exceptions
do not cause the stream to be closed.
"""
try:
return f(self, *args, **kwargs)
except Exception:
self.log.error("Uncaught exception in %r" % f, exc_info=True)
Hannes Schulz
introduce is_url() replacing validate_url() in cases where it is used in a control structure (as opposed to an assert)
r5078 def is_url(url):
"""boolean check for whether a string is a zmq url"""
if '://' not in url:
return False
proto, addr = url.split('://', 1)
if proto.lower() not in ['tcp','pgm','epgm','ipc','inproc']:
return False
return True
MinRK
improved logging + Hub,Engine,Scheduler are Configurable
r3603 def validate_url(url):
"""validate a url for zeromq"""
Thomas Kluyver
Replace references to unicode and basestring
r13353 if not isinstance(url, string_types):
MinRK
improved logging + Hub,Engine,Scheduler are Configurable
r3603 raise TypeError("url must be a string, not %r"%type(url))
url = url.lower()
proto_addr = url.split('://')
assert len(proto_addr) == 2, 'Invalid url: %r'%url
proto, addr = proto_addr
assert proto in ['tcp','pgm','epgm','ipc','inproc'], "Invalid protocol: %r"%proto
# domain pattern adapted from http://www.regexlib.com/REDetails.aspx?regexp_id=391
# author: Remi Sabourin
pat = re.compile(r'^([\w\d]([\w\d\-]{0,61}[\w\d])?\.)*[\w\d]([\w\d\-]{0,61}[\w\d])?$')
if proto == 'tcp':
lis = addr.split(':')
assert len(lis) == 2, 'Invalid url: %r'%url
addr,s_port = lis
try:
port = int(s_port)
except ValueError:
raise AssertionError("Invalid port %r in url: %r"%(port, url))
MinRK
API update involving map and load-balancing
r3635 assert addr == '*' or pat.match(addr) is not None, 'Invalid url: %r'%url
MinRK
improved logging + Hub,Engine,Scheduler are Configurable
r3603
else:
# only validate tcp urls currently
pass
return True
def validate_url_container(container):
"""validate a potentially nested collection of urls."""
Thomas Kluyver
Replace references to unicode and basestring
r13353 if isinstance(container, string_types):
MinRK
improved logging + Hub,Engine,Scheduler are Configurable
r3603 url = container
return validate_url(url)
elif isinstance(container, dict):
Thomas Kluyver
Fix references to dict.iteritems and dict.itervalues
r13361 container = itervalues(container)
MinRK
improved logging + Hub,Engine,Scheduler are Configurable
r3603
for element in container:
MinRK
persist connection data to disk as json
r3614 validate_url_container(element)
def split_url(url):
"""split a zmq url (tcp://ip:port) into ('tcp','ip','port')."""
proto_addr = url.split('://')
assert len(proto_addr) == 2, 'Invalid url: %r'%url
proto, addr = proto_addr
lis = addr.split(':')
assert len(lis) == 2, 'Invalid url: %r'%url
addr,s_port = lis
return proto,addr,s_port
MinRK
disambiguate to location when no IPs can be determined...
r16139
MinRK
persist connection data to disk as json
r3614 def disambiguate_ip_address(ip, location=None):
MinRK
disambiguate to location when no IPs can be determined...
r16139 """turn multi-ip interfaces '0.0.0.0' and '*' into a connectable address
Explicit IP addresses are returned unmodified.
Parameters
----------
ip : IP address
An IP address, or the special values 0.0.0.0, or *
location: IP address, optional
A public IP of the target machine.
If location is an IP of the current machine,
localhost will be returned,
otherwise location will be returned.
"""
if ip in {'0.0.0.0', '*'}:
if not location:
# unspecified location, localhost is the only choice
ip = localhost()
elif is_public_ip(location):
# location is a public IP on this machine, use localhost
MinRK
avoid executing code in utils.localinterfaces at import time...
r12591 ip = localhost()
MinRK
disambiguate to location when no IPs can be determined...
r16139 elif not public_ips():
# this machine's public IPs cannot be determined,
# assume `location` is not this machine
warnings.warn("IPython could not determine public IPs", RuntimeWarning)
ip = location
else:
# location is not this machine, do not use loopback
ip = location
MinRK
persist connection data to disk as json
r3614 return ip
MinRK
disambiguate to location when no IPs can be determined...
r16139
MinRK
persist connection data to disk as json
r3614 def disambiguate_url(url, location=None):
"""turn multi-ip interfaces '0.0.0.0' and '*' into connectable
ones, based on the location (default interpretation is localhost).
Thomas Kluyver
Various docs fixes
r13595 This is for zeromq urls, such as ``tcp://*:10101``.
"""
MinRK
persist connection data to disk as json
r3614 try:
proto,ip,port = split_url(url)
except AssertionError:
# probably not tcp url; could be ipc, etc.
return url
ip = disambiguate_ip_address(ip,location)
return "%s://%s:%s"%(proto,ip,port)
MinRK
cleanup pass
r3644
MinRK
update API after sagedays29...
r3664 #--------------------------------------------------------------------------
# helpers for implementing old MEC API via view.apply
#--------------------------------------------------------------------------
def interactive(f):
"""decorator for making functions appear as interactively defined.
This results in the function being linked to the user_ns as globals()
instead of the module globals().
"""
MinRK
make @interactive decorator friendlier with dill...
r13877
# build new FunctionType, so it can have the right globals
# interactive functions never have closures, that's kind of the point
MinRK
still allow @interactive on Classes
r13879 if isinstance(f, FunctionType):
mainmod = __import__('__main__')
f = FunctionType(f.__code__, mainmod.__dict__,
f.__name__, f.__defaults__,
)
MinRK
make @interactive decorator friendlier with dill...
r13877 # associate with __main__ for uncanning
MinRK
still allow @interactive on Classes
r13879 f.__module__ = '__main__'
return f
MinRK
update API after sagedays29...
r3664
@interactive
MinRK
enable non-copying sends in push...
r6243 def _push(**ns):
MinRK
update API after sagedays29...
r3664 """helper method for implementing `client.push` via `client.apply`"""
Bradley M. Froehle
Parallel: Support get/set of nested objects in view (e.g. dv['a.b'])
r8271 user_ns = globals()
tmp = '_IP_PUSH_TMP_'
while tmp in user_ns:
tmp = tmp + '_'
try:
Thomas Kluyver
Fix parallel test suite
r13383 for name, value in ns.items():
Bradley M. Froehle
Parallel: Support get/set of nested objects in view (e.g. dv['a.b'])
r8271 user_ns[tmp] = value
Thomas Kluyver
Fix exec statements for Py 3...
r13350 exec("%s = %s" % (name, tmp), user_ns)
Bradley M. Froehle
Parallel: Support get/set of nested objects in view (e.g. dv['a.b'])
r8271 finally:
user_ns.pop(tmp, None)
MinRK
update API after sagedays29...
r3664
@interactive
def _pull(keys):
"""helper method for implementing `client.pull` via `client.apply`"""
if isinstance(keys, (list,tuple, set)):
Thomas Kluyver
Fix parallel test suite
r13383 return [eval(key, globals()) for key in keys]
MinRK
update API after sagedays29...
r3664 else:
Bradley M. Froehle
Parallel: Support get/set of nested objects in view (e.g. dv['a.b'])
r8271 return eval(keys, globals())
MinRK
update API after sagedays29...
r3664
@interactive
def _execute(code):
"""helper method for implementing `client.execute` via `client.apply`"""
Thomas Kluyver
Fix exec statements for Py 3...
r13350 exec(code, globals())
MinRK
update API after sagedays29...
r3664
MinRK
organize IPython.parallel into subpackages
r3673 #--------------------------------------------------------------------------
# extra process management utilities
#--------------------------------------------------------------------------
_random_ports = set()
def select_random_ports(n):
"""Selects and return n random ports that are available."""
ports = []
Thomas Kluyver
Fix references to xrange
r13356 for i in range(n):
MinRK
organize IPython.parallel into subpackages
r3673 sock = socket.socket()
sock.bind(('', 0))
while sock.getsockname()[1] in _random_ports:
sock.close()
sock = socket.socket()
sock.bind(('', 0))
ports.append(sock)
for i, sock in enumerate(ports):
port = sock.getsockname()[1]
sock.close()
ports[i] = port
_random_ports.add(port)
return ports
def signal_children(children):
"""Relay interupt/term signals to children, for more solid process cleanup."""
def terminate_children(sig, frame):
MinRK
use utils.log.get_logger where appropriate
r17057 log = get_logger()
MinRK
IPython.parallel logging cleanup...
r4506 log.critical("Got signal %i, terminating children..."%sig)
MinRK
organize IPython.parallel into subpackages
r3673 for child in children:
child.terminate()
sys.exit(sig != SIGINT)
# sys.exit(sig)
for sig in (SIGINT, SIGABRT, SIGTERM):
signal(sig, terminate_children)
def generate_exec_key(keyfile):
import uuid
newkey = str(uuid.uuid4())
with open(keyfile, 'w') as f:
# f.write('ipython-key ')
f.write(newkey+'\n')
# set user-only RW permissions (0600)
# this will have no effect on Windows
os.chmod(keyfile, stat.S_IRUSR|stat.S_IWUSR)
def integer_loglevel(loglevel):
try:
loglevel = int(loglevel)
except ValueError:
if isinstance(loglevel, str):
loglevel = getattr(logging, loglevel)
return loglevel
def connect_logger(logname, context, iface, root="ip", loglevel=logging.DEBUG):
logger = logging.getLogger(logname)
if any([isinstance(h, handlers.PUBHandler) for h in logger.handlers]):
# don't add a second PUBHandler
return
loglevel = integer_loglevel(loglevel)
lsock = context.socket(zmq.PUB)
lsock.connect(iface)
handler = handlers.PUBHandler(lsock)
handler.setLevel(loglevel)
handler.root_topic = root
logger.addHandler(handler)
logger.setLevel(loglevel)
def connect_engine_logger(context, iface, engine, loglevel=logging.DEBUG):
logger = logging.getLogger()
if any([isinstance(h, handlers.PUBHandler) for h in logger.handlers]):
# don't add a second PUBHandler
return
loglevel = integer_loglevel(loglevel)
lsock = context.socket(zmq.PUB)
lsock.connect(iface)
handler = EnginePUBHandler(engine, lsock)
handler.setLevel(loglevel)
logger.addHandler(handler)
logger.setLevel(loglevel)
MinRK
reorganize Factory classes to follow relocation of Session object
r4007 return logger
MinRK
organize IPython.parallel into subpackages
r3673
def local_logger(logname, loglevel=logging.DEBUG):
loglevel = integer_loglevel(loglevel)
logger = logging.getLogger(logname)
if any([isinstance(h, logging.StreamHandler) for h in logger.handlers]):
# don't add a second StreamHandler
return
handler = logging.StreamHandler()
handler.setLevel(loglevel)
MinRK
match log format in Scheduler to rest of parallel apps
r5888 formatter = logging.Formatter("%(asctime)s.%(msecs).03d [%(name)s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S")
handler.setFormatter(formatter)
MinRK
organize IPython.parallel into subpackages
r3673 logger.addHandler(handler)
logger.setLevel(loglevel)
MinRK
reorganize Factory classes to follow relocation of Session object
r4007 return logger
MinRK
General improvements to database backend...
r3780
MinRK
set unlimited HWM for all relay devices...
r10614 def set_hwm(sock, hwm=0):
"""set zmq High Water Mark on a socket
in a way that always works for various pyzmq / libzmq versions.
"""
import zmq
for key in ('HWM', 'SNDHWM', 'RCVHWM'):
opt = getattr(zmq, key, None)
if opt is None:
continue
try:
sock.setsockopt(opt, hwm)
except zmq.ZMQError:
pass
Spencer Nelson
Remove unused imports
r16525