##// END OF EJS Templates
Refactor newparallel to use Config system...
Refactor newparallel to use Config system This is working, but incomplete.

File last commit:

r3604:2c044319
r3604:2c044319
Show More
entry_point.py
167 lines | 5.6 KiB | text/x-python | PythonLexer
""" Defines helper functions for creating kernel entry points and process
launchers.
************
NOTE: Most of this module has been deprecated by moving to Configurables
************
"""
# Standard library imports.
import logging
import atexit
import sys
import os
import stat
import socket
from subprocess import Popen, PIPE
from signal import signal, SIGINT, SIGABRT, SIGTERM
try:
from signal import SIGKILL
except ImportError:
SIGKILL=None
# System library imports.
import zmq
from zmq.log import handlers
# Local imports.
from IPython.core.ultratb import FormattedTB
from IPython.external.argparse import ArgumentParser
from IPython.zmq.log import EnginePUBHandler
def split_ports(s, n):
"""Parser helper for multiport strings"""
if not s:
return tuple([0]*n)
ports = map(int, s.split(','))
if len(ports) != n:
raise ValueError
return ports
_random_ports = set()
def select_random_ports(n):
"""Selects and return n random ports that are available."""
ports = []
for i in xrange(n):
sock = socket.socket()
sock.bind(('', 0))
while sock.getsockname()[1] in _random_ports:
sock.close()
sock = socket.socket()
sock.bind(('', 0))
ports.append(sock)
for i, sock in enumerate(ports):
port = sock.getsockname()[1]
sock.close()
ports[i] = port
_random_ports.add(port)
return ports
def parse_url(args):
"""Ensure args.url contains full transport://interface:port"""
if args.url:
iface = args.url.split('://',1)
if len(args) == 2:
args.transport,iface = iface
iface = iface.split(':')
args.ip = iface[0]
if iface[1]:
args.regport = iface[1]
args.url = "%s://%s:%i"%(args.transport, args.ip,args.regport)
def signal_children(children):
"""Relay interupt/term signals to children, for more solid process cleanup."""
def terminate_children(sig, frame):
logging.critical("Got signal %i, terminating children..."%sig)
for child in children:
child.terminate()
sys.exit(sig != SIGINT)
# sys.exit(sig)
for sig in (SIGINT, SIGABRT, SIGTERM):
signal(sig, terminate_children)
def generate_exec_key(keyfile):
import uuid
newkey = str(uuid.uuid4())
with open(keyfile, 'w') as f:
# f.write('ipython-key ')
f.write(newkey+'\n')
# set user-only RW permissions (0600)
# this will have no effect on Windows
os.chmod(keyfile, stat.S_IRUSR|stat.S_IWUSR)
def make_base_argument_parser():
""" Creates an ArgumentParser for the generic arguments supported by all
ipcluster entry points.
"""
parser = ArgumentParser()
parser.add_argument('--ip', type=str, default='127.0.0.1',
help='set the controller\'s IP address [default: local]')
parser.add_argument('--transport', type=str, default='tcp',
help='set the transport to use [default: tcp]')
parser.add_argument('--regport', type=int, metavar='PORT', default=10101,
help='set the XREP port for registration [default: 10101]')
parser.add_argument('--logport', type=int, metavar='PORT', default=0,
help='set the PUB port for remote logging [default: log to stdout]')
parser.add_argument('--loglevel', type=str, metavar='LEVEL', default=logging.INFO,
help='set the log level [default: INFO]')
parser.add_argument('--ident', type=str,
help='set the ZMQ identity [default: random]')
parser.add_argument('--packer', type=str, default='json',
choices=['json','pickle'],
help='set the message format method [default: json]')
parser.add_argument('--url', type=str,
help='set transport,ip,regport in one arg, e.g. tcp://127.0.0.1:10101')
parser.add_argument('--execkey', type=str,
help="File containing key for authenticating requests.")
return parser
def integer_loglevel(loglevel):
try:
loglevel = int(loglevel)
except ValueError:
if isinstance(loglevel, str):
loglevel = getattr(logging, loglevel)
return loglevel
def connect_logger(context, iface, root="ip", loglevel=logging.DEBUG):
logger = logging.getLogger()
if any([isinstance(h, handlers.PUBHandler) for h in logger.handlers]):
# don't add a second PUBHandler
return
loglevel = integer_loglevel(loglevel)
lsock = context.socket(zmq.PUB)
lsock.connect(iface)
handler = handlers.PUBHandler(lsock)
handler.setLevel(loglevel)
handler.root_topic = root
logger.addHandler(handler)
logger.setLevel(loglevel)
def connect_engine_logger(context, iface, engine, loglevel=logging.DEBUG):
logger = logging.getLogger()
if any([isinstance(h, handlers.PUBHandler) for h in logger.handlers]):
# don't add a second PUBHandler
return
loglevel = integer_loglevel(loglevel)
lsock = context.socket(zmq.PUB)
lsock.connect(iface)
handler = EnginePUBHandler(engine, lsock)
handler.setLevel(loglevel)
logger.addHandler(handler)
logger.setLevel(loglevel)
def local_logger(loglevel=logging.DEBUG):
loglevel = integer_loglevel(loglevel)
logger = logging.getLogger()
if any([isinstance(h, logging.StreamHandler) for h in logger.handlers]):
# don't add a second StreamHandler
return
handler = logging.StreamHandler()
handler.setLevel(loglevel)
logger.addHandler(handler)
logger.setLevel(loglevel)