##// END OF EJS Templates
Improvements to dependency handling...
Improvements to dependency handling Specifically: * add 'success_only' switch to Dependencies * Scheduler handles some cases where Dependencies are impossible to meet.

File last commit:

r3604:2c044319
r3607:d51586b9
Show More
factory.py
149 lines | 5.4 KiB | text/x-python | PythonLexer
MinRK
Refactor newparallel to use Config system...
r3604 """Base config factories."""
#-----------------------------------------------------------------------------
# Copyright (C) 2008-2009 The IPython Development Team
#
# Distributed under the terms of the BSD License. The full license is in
# the file COPYING, distributed as part of this software.
#-----------------------------------------------------------------------------
#-----------------------------------------------------------------------------
# Imports
#-----------------------------------------------------------------------------
import os
import logging
import uuid
from zmq.eventloop.ioloop import IOLoop
from IPython.config.configurable import Configurable
from IPython.utils.traitlets import Str,Int,Instance, CUnicode, CStr
from IPython.utils.importstring import import_item
from IPython.zmq.parallel.entry_point import select_random_ports
import IPython.zmq.parallel.streamsession as ss
#-----------------------------------------------------------------------------
# Classes
#-----------------------------------------------------------------------------
class SessionFactory(Configurable):
"""The Base factory from which every factory in IPython.zmq.parallel inherits"""
packer = Str('',config=True)
unpacker = Str('',config=True)
ident = CStr('',config=True)
def _ident_default(self):
return str(uuid.uuid4())
username = Str(os.environ.get('USER','username'),config=True)
exec_key = CUnicode('',config=True)
# not configurable:
context = Instance('zmq.Context', (), {})
session = Instance('IPython.zmq.parallel.streamsession.StreamSession')
loop = Instance('zmq.eventloop.ioloop.IOLoop')
def _loop_default(self):
return IOLoop.instance()
def __init__(self, **kwargs):
super(SessionFactory, self).__init__(**kwargs)
keyfile = self.exec_key or None
# set the packers:
if not self.packer:
packer_f = unpacker_f = None
elif self.packer.lower() == 'json':
packer_f = ss.json_packer
unpacker_f = ss.json_unpacker
elif self.packer.lower() == 'pickle':
packer_f = ss.pickle_packer
unpacker_f = ss.pickle_unpacker
else:
packer_f = import_item(self.packer)
unpacker_f = import_item(self.unpacker)
# construct the session
self.session = ss.StreamSession(self.username, self.ident, packer=packer_f, unpacker=unpacker_f, keyfile=keyfile)
class RegistrationFactory(SessionFactory):
"""The Base Configurable for objects that involve registration."""
url = Str('', config=True) # url takes precedence over ip,regport,transport
transport = Str('tcp', config=True)
ip = Str('127.0.0.1', config=True)
regport = Instance(int, config=True)
def _regport_default(self):
return 10101
# return select_random_ports(1)[0]
def __init__(self, **kwargs):
super(RegistrationFactory, self).__init__(**kwargs)
self._propagate_url()
self._rebuild_url()
self.on_trait_change(self._propagate_url, 'url')
self.on_trait_change(self._rebuild_url, 'ip')
self.on_trait_change(self._rebuild_url, 'transport')
self.on_trait_change(self._rebuild_url, 'regport')
def _rebuild_url(self):
self.url = "%s://%s:%i"%(self.transport, self.ip, self.regport)
def _propagate_url(self):
"""Ensure self.url contains full transport://interface:port"""
if self.url:
iface = self.url.split('://',1)
if len(iface) == 2:
self.transport,iface = iface
iface = iface.split(':')
self.ip = iface[0]
if iface[1]:
self.regport = int(iface[1])
#-----------------------------------------------------------------------------
# argparse argument extenders
#-----------------------------------------------------------------------------
def add_session_arguments(parser):
paa = parser.add_argument
paa('--ident',
type=str, dest='SessionFactory.ident',
help='set the ZMQ and session identity [default: random uuid]',
metavar='identity')
# paa('--execkey',
# type=str, dest='SessionFactory.exec_key',
# help='path to a file containing an execution key.',
# metavar='execkey')
paa('--packer',
type=str, dest='SessionFactory.packer',
help='method to serialize messages: {json,pickle} [default: json]',
metavar='packer')
paa('--unpacker',
type=str, dest='SessionFactory.unpacker',
help='inverse function of `packer`. Only necessary when using something other than json|pickle',
metavar='packer')
def add_registration_arguments(parser):
paa = parser.add_argument
paa('--ip',
type=str, dest='RegistrationFactory.ip',
help="The IP used for registration [default: localhost]",
metavar='ip')
paa('--transport',
type=str, dest='RegistrationFactory.transport',
help="The ZeroMQ transport used for registration [default: tcp]",
metavar='transport')
paa('--url',
type=str, dest='RegistrationFactory.url',
help='set transport,ip,regport in one go, e.g. tcp://127.0.0.1:10101',
metavar='url')
paa('--regport',
type=int, dest='RegistrationFactory.regport',
help="The port used for registration [default: 10101]",
metavar='ip')