##// END OF EJS Templates
better handle aborted/unschedulers tasks
better handle aborted/unschedulers tasks

File last commit:

r3673:b9f54806
r3687:9566c9d2
Show More
factory.py
152 lines | 5.7 KiB | text/x-python | PythonLexer
MinRK
Refactor newparallel to use Config system...
r3604 """Base config factories."""
#-----------------------------------------------------------------------------
# Copyright (C) 2008-2009 The IPython Development Team
#
# Distributed under the terms of the BSD License. The full license is in
# the file COPYING, distributed as part of this software.
#-----------------------------------------------------------------------------
#-----------------------------------------------------------------------------
# Imports
#-----------------------------------------------------------------------------
import logging
MinRK
resort imports in a cleaner order
r3631 import os
MinRK
Refactor newparallel to use Config system...
r3604 import uuid
from zmq.eventloop.ioloop import IOLoop
from IPython.config.configurable import Configurable
from IPython.utils.importstring import import_item
MinRK
resort imports in a cleaner order
r3631 from IPython.utils.traitlets import Str,Int,Instance, CUnicode, CStr
MinRK
Refactor newparallel to use Config system...
r3604
MinRK
move IPython.zmq.parallel to IPython.parallel
r3666 import IPython.parallel.streamsession as ss
MinRK
organize IPython.parallel into subpackages
r3673 from IPython.parallel.util import select_random_ports
MinRK
Refactor newparallel to use Config system...
r3604
#-----------------------------------------------------------------------------
# Classes
#-----------------------------------------------------------------------------
MinRK
rework logging connections
r3610 class LoggingFactory(Configurable):
"""A most basic class, that has a `log` (type:`Logger`) attribute, set via a `logname` Trait."""
log = Instance('logging.Logger', ('ZMQ', logging.WARN))
MinRK
cleanup pass
r3644 logname = CUnicode('ZMQ')
MinRK
rework logging connections
r3610 def _logname_changed(self, name, old, new):
self.log = logging.getLogger(new)
MinRK
Refactor newparallel to use Config system...
r3604
MinRK
rework logging connections
r3610 class SessionFactory(LoggingFactory):
MinRK
move IPython.zmq.parallel to IPython.parallel
r3666 """The Base factory from which every factory in IPython.parallel inherits"""
MinRK
Refactor newparallel to use Config system...
r3604
packer = Str('',config=True)
unpacker = Str('',config=True)
ident = CStr('',config=True)
def _ident_default(self):
return str(uuid.uuid4())
MinRK
cleanup pass
r3644 username = CUnicode(os.environ.get('USER','username'),config=True)
exec_key = CUnicode('',config=True)
MinRK
Refactor newparallel to use Config system...
r3604 # not configurable:
context = Instance('zmq.Context', (), {})
MinRK
move IPython.zmq.parallel to IPython.parallel
r3666 session = Instance('IPython.parallel.streamsession.StreamSession')
MinRK
rework logging connections
r3610 loop = Instance('zmq.eventloop.ioloop.IOLoop', allow_none=False)
MinRK
Refactor newparallel to use Config system...
r3604 def _loop_default(self):
return IOLoop.instance()
MinRK
rework logging connections
r3610
MinRK
Refactor newparallel to use Config system...
r3604 def __init__(self, **kwargs):
super(SessionFactory, self).__init__(**kwargs)
MinRK
persist connection data to disk as json
r3614 exec_key = self.exec_key or None
MinRK
Refactor newparallel to use Config system...
r3604 # set the packers:
if not self.packer:
packer_f = unpacker_f = None
elif self.packer.lower() == 'json':
packer_f = ss.json_packer
unpacker_f = ss.json_unpacker
elif self.packer.lower() == 'pickle':
packer_f = ss.pickle_packer
unpacker_f = ss.pickle_unpacker
else:
packer_f = import_item(self.packer)
unpacker_f = import_item(self.unpacker)
# construct the session
MinRK
persist connection data to disk as json
r3614 self.session = ss.StreamSession(self.username, self.ident, packer=packer_f, unpacker=unpacker_f, key=exec_key)
MinRK
Refactor newparallel to use Config system...
r3604
class RegistrationFactory(SessionFactory):
"""The Base Configurable for objects that involve registration."""
url = Str('', config=True) # url takes precedence over ip,regport,transport
transport = Str('tcp', config=True)
ip = Str('127.0.0.1', config=True)
regport = Instance(int, config=True)
def _regport_default(self):
MinRK
persist connection data to disk as json
r3614 # return 10101
return select_random_ports(1)[0]
MinRK
Refactor newparallel to use Config system...
r3604
def __init__(self, **kwargs):
super(RegistrationFactory, self).__init__(**kwargs)
self._propagate_url()
self._rebuild_url()
self.on_trait_change(self._propagate_url, 'url')
self.on_trait_change(self._rebuild_url, 'ip')
self.on_trait_change(self._rebuild_url, 'transport')
self.on_trait_change(self._rebuild_url, 'regport')
def _rebuild_url(self):
self.url = "%s://%s:%i"%(self.transport, self.ip, self.regport)
def _propagate_url(self):
"""Ensure self.url contains full transport://interface:port"""
if self.url:
iface = self.url.split('://',1)
if len(iface) == 2:
self.transport,iface = iface
iface = iface.split(':')
self.ip = iface[0]
if iface[1]:
self.regport = int(iface[1])
#-----------------------------------------------------------------------------
# argparse argument extenders
#-----------------------------------------------------------------------------
def add_session_arguments(parser):
paa = parser.add_argument
paa('--ident',
type=str, dest='SessionFactory.ident',
help='set the ZMQ and session identity [default: random uuid]',
metavar='identity')
# paa('--execkey',
# type=str, dest='SessionFactory.exec_key',
# help='path to a file containing an execution key.',
# metavar='execkey')
paa('--packer',
type=str, dest='SessionFactory.packer',
help='method to serialize messages: {json,pickle} [default: json]',
metavar='packer')
paa('--unpacker',
type=str, dest='SessionFactory.unpacker',
help='inverse function of `packer`. Only necessary when using something other than json|pickle',
metavar='packer')
def add_registration_arguments(parser):
paa = parser.add_argument
paa('--ip',
type=str, dest='RegistrationFactory.ip',
help="The IP used for registration [default: localhost]",
metavar='ip')
paa('--transport',
type=str, dest='RegistrationFactory.transport',
help="The ZeroMQ transport used for registration [default: tcp]",
metavar='transport')
paa('--url',
type=str, dest='RegistrationFactory.url',
help='set transport,ip,regport in one go, e.g. tcp://127.0.0.1:10101',
metavar='url')
paa('--regport',
type=int, dest='RegistrationFactory.regport',
help="The port used for registration [default: 10101]",
metavar='ip')