diff --git a/IPython/kernel/clientconnector.py b/IPython/kernel/clientconnector.py index fca7e19..dc40d05 100644 --- a/IPython/kernel/clientconnector.py +++ b/IPython/kernel/clientconnector.py @@ -1,6 +1,4 @@ -#!/usr/bin/env python # encoding: utf-8 - """Facilities for handling client connections to the controller.""" #----------------------------------------------------------------------------- @@ -20,6 +18,8 @@ import os from IPython.kernel.fcutil import ( Tub, find_furl, + is_valid_furl, + is_valid_furl_file, is_valid_furl_or_file, validate_furl_or_file, FURLError @@ -66,18 +66,30 @@ class AsyncClientConnector(object): def _find_furl(self, profile='default', cluster_dir=None, furl_or_file=None, furl_file_name=None, ipython_dir=None): - """Find a FURL file by profile+ipython_dir or cluster dir. + """Find a FURL file. + + If successful, this returns a FURL file that exists on the file + system. The contents of the file have not been checked though. This + is because we often have to deal with FURL file whose buffers have + not been flushed. This raises an :exc:`~IPython.kernel.fcutil.FURLError` exception if a FURL file can't be found. + + This tries the following: + + 1. By the name ``furl_or_file``. + 2. By ``cluster_dir`` and ``furl_file_name``. + 3. By cluster profile with a default of ``default``. This uses + ``ipython_dir``. """ # Try by furl_or_file if furl_or_file is not None: - validate_furl_or_file(furl_or_file) - return furl_or_file + if is_valid_furl_or_file(furl_or_file): + return furl_or_file if furl_file_name is None: - raise FURLError('A furl_file_name must be provided') + raise FURLError('A furl_file_name must be provided if furl_or_file is not') # Try by cluster_dir if cluster_dir is not None: @@ -151,7 +163,7 @@ class AsyncClientConnector(object): The full path to a cluster directory. This is useful if profiles are not being used. furl_or_file : str - A furl or a filename containing a FURLK. This is useful if you + A furl or a filename containing a FURL. This is useful if you simply know the location of the FURL file. ipython_dir : str The location of the ipython_dir if different from the default. @@ -193,7 +205,7 @@ class AsyncClientConnector(object): The full path to a cluster directory. This is useful if profiles are not being used. furl_or_file : str - A furl or a filename containing a FURLK. This is useful if you + A furl or a filename containing a FURL. This is useful if you simply know the location of the FURL file. ipython_dir : str The location of the ipython_dir if different from the default. @@ -259,6 +271,9 @@ class AsyncClientConnector(object): profile, cluster_dir, furl_or_file, furl_file_name, ipython_dir ) + # If this succeeds, we know the furl file exists and has a .furl + # extension, but it could still be empty. That is checked each + # connection attempt. except FURLError: return defer.fail(failure.Failure()) @@ -349,7 +364,7 @@ class ClientConnector(object): The full path to a cluster directory. This is useful if profiles are not being used. furl_or_file : str - A furl or a filename containing a FURLK. This is useful if you + A furl or a filename containing a FURL. This is useful if you simply know the location of the FURL file. ipython_dir : str The location of the ipython_dir if different from the default. @@ -390,7 +405,7 @@ class ClientConnector(object): The full path to a cluster directory. This is useful if profiles are not being used. furl_or_file : str - A furl or a filename containing a FURLK. This is useful if you + A furl or a filename containing a FURL. This is useful if you simply know the location of the FURL file. ipython_dir : str The location of the ipython_dir if different from the default. diff --git a/IPython/kernel/contexts.py b/IPython/kernel/contexts.py deleted file mode 100644 index e8ccc8e..0000000 --- a/IPython/kernel/contexts.py +++ /dev/null @@ -1,139 +0,0 @@ -# encoding: utf-8 -# -*- test-case-name: IPython.kernel.test.test_contexts -*- -"""Context managers for IPython. - -Python 2.5 introduced the `with` statement, which is based on the context -manager protocol. This module offers a few context managers for common cases, -which can also be useful as templates for writing new, application-specific -managers. -""" - -__docformat__ = "restructuredtext en" - -#------------------------------------------------------------------------------- -# Copyright (C) 2008 The IPython Development Team -# -# Distributed under the terms of the BSD License. The full license is in -# the file COPYING, distributed as part of this software. -#------------------------------------------------------------------------------- - -#------------------------------------------------------------------------------- -# Imports -#------------------------------------------------------------------------------- - -import linecache -import sys - -from IPython.core.ultratb import findsource -from IPython.core import ipapi - -from IPython.kernel import error - -#--------------------------------------------------------------------------- -# Utility functions needed by all context managers. -#--------------------------------------------------------------------------- - -def remote(): - """Raises a special exception meant to be caught by context managers. - """ - m = 'Special exception to stop local execution of parallel code.' - raise error.StopLocalExecution(m) - - -def strip_whitespace(source,require_remote=True): - """strip leading whitespace from input source. - - :Parameters: - - """ - remote_mark = 'remote()' - # Expand tabs to avoid any confusion. - wsource = [l.expandtabs(4) for l in source] - # Detect the indentation level - done = False - for line in wsource: - if line.isspace(): - continue - for col,char in enumerate(line): - if char != ' ': - done = True - break - if done: - break - # Now we know how much leading space there is in the code. Next, we - # extract up to the first line that has less indentation. - # WARNINGS: we skip comments that may be misindented, but we do NOT yet - # detect triple quoted strings that may have flush left text. - for lno,line in enumerate(wsource): - lead = line[:col] - if lead.isspace(): - continue - else: - if not lead.lstrip().startswith('#'): - break - # The real 'with' source is up to lno - src_lines = [l[col:] for l in wsource[:lno+1]] - - # Finally, check that the source's first non-comment line begins with the - # special call 'remote()' - if require_remote: - for nline,line in enumerate(src_lines): - if line.isspace() or line.startswith('#'): - continue - if line.startswith(remote_mark): - break - else: - raise ValueError('%s call missing at the start of code' % - remote_mark) - out_lines = src_lines[nline+1:] - else: - # If the user specified that the remote() call wasn't mandatory - out_lines = src_lines - - # src = ''.join(out_lines) # dbg - #print 'SRC:\n<<<<<<<>>>>>>>\n%s<<<<<>>>>>>' % src # dbg - return ''.join(out_lines) - -class RemoteContextBase(object): - def __init__(self): - self.ip = ipapi.get() - - def _findsource_file(self,f): - linecache.checkcache() - s = findsource(f.f_code) - lnum = f.f_lineno - wsource = s[0][f.f_lineno:] - return strip_whitespace(wsource) - - def _findsource_ipython(self,f): - from IPython.core import ipapi - self.ip = ipapi.get() - buf = self.ip.input_hist_raw[-1].splitlines()[1:] - wsource = [l+'\n' for l in buf ] - - return strip_whitespace(wsource) - - def findsource(self,frame): - local_ns = frame.f_locals - global_ns = frame.f_globals - if frame.f_code.co_filename == '': - src = self._findsource_ipython(frame) - else: - src = self._findsource_file(frame) - return src - - def __enter__(self): - raise NotImplementedError - - def __exit__ (self, etype, value, tb): - if issubclass(etype,error.StopLocalExecution): - return True - -class RemoteMultiEngine(RemoteContextBase): - def __init__(self,mec): - self.mec = mec - RemoteContextBase.__init__(self) - - def __enter__(self): - src = self.findsource(sys._getframe(1)) - return self.mec.execute(src) diff --git a/IPython/kernel/fcutil.py b/IPython/kernel/fcutil.py index a222401..2225be1 100644 --- a/IPython/kernel/fcutil.py +++ b/IPython/kernel/fcutil.py @@ -23,16 +23,16 @@ import tempfile from twisted.internet import reactor, defer from twisted.python import log +import foolscap from foolscap import Tub, UnauthenticatedTub from IPython.config.loader import Config - from IPython.kernel.configobjfactory import AdaptedConfiguredObjectFactory - from IPython.kernel.error import SecurityError -from IPython.utils.traitlets import Int, Str, Bool, Instance from IPython.utils.importstring import import_item +from IPython.utils.path import expand_path +from IPython.utils.traitlets import Int, Str, Bool, Instance #----------------------------------------------------------------------------- # Code @@ -57,17 +57,17 @@ class FURLError(Exception): def check_furl_file_security(furl_file, secure): """Remove the old furl_file if changing security modes.""" + furl_file = expand_path(furl_file) if os.path.isfile(furl_file): - f = open(furl_file, 'r') - oldfurl = f.read().strip() - f.close() + with open(furl_file, 'r') as f: + oldfurl = f.read().strip() if (oldfurl.startswith('pb://') and not secure) or (oldfurl.startswith('pbu://') and secure): os.remove(furl_file) def is_secure(furl): """Is the given FURL secure or not.""" - if is_valid(furl): + if is_valid_furl(furl): if furl.startswith("pb://"): return True elif furl.startswith("pbu://"): @@ -76,26 +76,45 @@ def is_secure(furl): raise FURLError("invalid FURL: %s" % furl) -def is_valid(furl): +def is_valid_furl(furl): """Is the str a valid FURL or not.""" if isinstance(furl, str): if furl.startswith("pb://") or furl.startswith("pbu://"): return True + else: + return False else: return False +def is_valid_furl_file(furl_or_file): + """See if furl_or_file exists and contains a valid FURL. + + This doesn't try to read the contents because often we have to validate + FURL files that are created, but don't yet have a FURL written to them. + """ + if isinstance(furl_or_file, (str, unicode)): + path, furl_filename = os.path.split(furl_or_file) + if os.path.isdir(path) and furl_filename.endswith('.furl'): + return True + return False + + def find_furl(furl_or_file): - """Find, validate and return a FURL in a string or file.""" - if isinstance(furl_or_file, str): - if is_valid(furl_or_file): - return furl_or_file - if os.path.isfile(furl_or_file): + """Find, validate and return a FURL in a string or file. + + This calls :func:`IPython.utils.path.expand_path` on the argument to + properly handle ``~`` and ``$`` variables in the path. + """ + if is_valid_furl(furl_or_file): + return furl_or_file + furl_or_file = expand_path(furl_or_file) + if is_valid_furl_file(furl_or_file): with open(furl_or_file, 'r') as f: furl = f.read().strip() - if is_valid(furl): + if is_valid_furl(furl): return furl - raise FURLError("Not a valid FURL or FURL file: %s" % furl_or_file) + raise FURLError("Not a valid FURL or FURL file: %r" % furl_or_file) def is_valid_furl_or_file(furl_or_file): @@ -106,17 +125,14 @@ def is_valid_furl_or_file(furl_or_file): if the FURL file exists or to read its contents. This is useful for cases where auto re-connection is being used. """ - if isinstance(furl_or_file, str): - if is_valid(furl_or_file): - return True - if isinstance(furl_or_file, (str, unicode)): - path, furl_filename = os.path.split(furl_or_file) - if os.path.isdir(path) and furl_filename.endswith('.furl'): - return True - return False + if is_valid_furl(furl_or_file) or is_valid_furl_file(furl_or_file): + return True + else: + return False def validate_furl_or_file(furl_or_file): + """Like :func:`is_valid_furl_or_file`, but raises an error.""" if not is_valid_furl_or_file(furl_or_file): raise FURLError('Not a valid FURL or FURL file: %r' % furl_or_file) diff --git a/IPython/kernel/multiengineclient.py b/IPython/kernel/multiengineclient.py index ccdb51b..e3f0446 100644 --- a/IPython/kernel/multiengineclient.py +++ b/IPython/kernel/multiengineclient.py @@ -17,12 +17,12 @@ __docformat__ = "restructuredtext en" #------------------------------------------------------------------------------- import sys -import linecache import warnings from twisted.python import components from twisted.python.failure import Failure from zope.interface import Interface, implements, Attribute +from foolscap import DeadReferenceError from IPython.utils.coloransi import TermColors @@ -306,85 +306,6 @@ class InteractiveMultiEngineClient(object): def __len__(self): """Return the number of available engines.""" return len(self.get_ids()) - - #--------------------------------------------------------------------------- - # Make this a context manager for with - #--------------------------------------------------------------------------- - - def findsource_file(self,f): - linecache.checkcache() - s = findsource(f.f_code) # findsource is not defined! - lnum = f.f_lineno - wsource = s[0][f.f_lineno:] - return strip_whitespace(wsource) - - def findsource_ipython(self,f): - from IPython.core import ipapi - self.ip = ipapi.get() - wsource = [l+'\n' for l in - self.ip.input_hist_raw[-1].splitlines()[1:]] - return strip_whitespace(wsource) - - def __enter__(self): - f = sys._getframe(1) - local_ns = f.f_locals - global_ns = f.f_globals - if f.f_code.co_filename == '': - s = self.findsource_ipython(f) - else: - s = self.findsource_file(f) - - self._with_context_result = self.execute(s) - - def __exit__ (self, etype, value, tb): - if issubclass(etype,error.StopLocalExecution): - return True - - -def remote(): - m = 'Special exception to stop local execution of parallel code.' - raise error.StopLocalExecution(m) - -def strip_whitespace(source): - # Expand tabs to avoid any confusion. - wsource = [l.expandtabs(4) for l in source] - # Detect the indentation level - done = False - for line in wsource: - if line.isspace(): - continue - for col,char in enumerate(line): - if char != ' ': - done = True - break - if done: - break - # Now we know how much leading space there is in the code. Next, we - # extract up to the first line that has less indentation. - # WARNINGS: we skip comments that may be misindented, but we do NOT yet - # detect triple quoted strings that may have flush left text. - for lno,line in enumerate(wsource): - lead = line[:col] - if lead.isspace(): - continue - else: - if not lead.lstrip().startswith('#'): - break - # The real 'with' source is up to lno - src_lines = [l[col:] for l in wsource[:lno+1]] - - # Finally, check that the source's first non-comment line begins with the - # special call 'remote()' - for nline,line in enumerate(src_lines): - if line.isspace() or line.startswith('#'): - continue - if 'remote()' in line: - break - else: - raise ValueError('remote() call missing at the start of code') - src = ''.join(src_lines[nline+1:]) - #print 'SRC:\n<<<<<<<>>>>>>>\n%s<<<<<>>>>>>' % src # dbg - return src #------------------------------------------------------------------------------- @@ -444,18 +365,31 @@ class FullBlockingMultiEngineClient(InteractiveMultiEngineClient): def _findTargetsAndBlock(self, targets=None, block=None): return self._findTargets(targets), self._findBlock(block) - + + def _bcft(self, *args, **kwargs): + try: + result = blockingCallFromThread(*args, **kwargs) + except DeadReferenceError: + raise error.ConnectionError( + """A connection error has occurred in trying to connect to the + controller. This is usually caused by the controller dying or + being restarted. To resolve this issue try recreating the + multiengine client.""" + ) + else: + return result + def _blockFromThread(self, function, *args, **kwargs): block = kwargs.get('block', None) if block is None: raise error.MissingBlockArgument("'block' keyword argument is missing") - result = blockingCallFromThread(function, *args, **kwargs) + result = self._bcft(function, *args, **kwargs) if not block: result = PendingResult(self, result) return result def get_pending_deferred(self, deferredID, block): - return blockingCallFromThread(self.smultiengine.get_pending_deferred, deferredID, block) + return self._bcft(self.smultiengine.get_pending_deferred, deferredID, block) def barrier(self, pendingResults): """Synchronize a set of `PendingResults`. @@ -505,7 +439,7 @@ class FullBlockingMultiEngineClient(InteractiveMultiEngineClient): controller. This method allows the user to clear out all un-retrieved results on the controller. """ - r = blockingCallFromThread(self.smultiengine.clear_pending_deferreds) + r = self._bcft(self.smultiengine.clear_pending_deferreds) return r clear_pending_results = flush @@ -529,7 +463,7 @@ class FullBlockingMultiEngineClient(InteractiveMultiEngineClient): at a later time. """ targets, block = self._findTargetsAndBlock(targets, block) - result = blockingCallFromThread(self.smultiengine.execute, lines, + result = self._bcft(self.smultiengine.execute, lines, targets=targets, block=block) if block: result = ResultList(result) @@ -647,7 +581,7 @@ class FullBlockingMultiEngineClient(InteractiveMultiEngineClient): at a later time. """ targets, block = self._findTargetsAndBlock(targets, block) - result = blockingCallFromThread(self.smultiengine.get_result, i, targets=targets, block=block) + result = self._bcft(self.smultiengine.get_result, i, targets=targets, block=block) if block: result = ResultList(result) else: @@ -773,7 +707,7 @@ class FullBlockingMultiEngineClient(InteractiveMultiEngineClient): """ Returns the ids of currently registered engines. """ - result = blockingCallFromThread(self.smultiengine.get_ids) + result = self._bcft(self.smultiengine.get_ids) return result #--------------------------------------------------------------------------- diff --git a/IPython/kernel/taskclient.py b/IPython/kernel/taskclient.py index 3215da4..b639bd4 100644 --- a/IPython/kernel/taskclient.py +++ b/IPython/kernel/taskclient.py @@ -20,9 +20,10 @@ __docformat__ = "restructuredtext en" from zope.interface import Interface, implements from twisted.python import components +from foolscap import DeadReferenceError from IPython.kernel.twistedutil import blockingCallFromThread -from IPython.kernel import task +from IPython.kernel import task, error from IPython.kernel.mapper import ( SynchronousTaskMapper, ITaskMapperFactory, @@ -58,7 +59,20 @@ class BlockingTaskClient(object): def __init__(self, task_controller): self.task_controller = task_controller self.block = True - + + def _bcft(self, *args, **kwargs): + try: + result = blockingCallFromThread(*args, **kwargs) + except DeadReferenceError: + raise error.ConnectionError( + """A connection error has occurred in trying to connect to the + controller. This is usually caused by the controller dying or + being restarted. To resolve this issue try recreating the + task client.""" + ) + else: + return result + def run(self, task, block=False): """Run a task on the `TaskController`. @@ -71,7 +85,7 @@ class BlockingTaskClient(object): :Returns: The int taskid of the submitted task. Pass this to `get_task_result` to get the `TaskResult` object. """ - tid = blockingCallFromThread(self.task_controller.run, task) + tid = self._bcft(self.task_controller.run, task) if block: return self.get_task_result(tid, block=True) else: @@ -89,7 +103,7 @@ class BlockingTaskClient(object): :Returns: A `TaskResult` object that encapsulates the task result. """ - return blockingCallFromThread(self.task_controller.get_task_result, + return self._bcft(self.task_controller.get_task_result, taskid, block) def abort(self, taskid): @@ -100,7 +114,7 @@ class BlockingTaskClient(object): taskid : int The taskid of the task to be aborted. """ - return blockingCallFromThread(self.task_controller.abort, taskid) + return self._bcft(self.task_controller.abort, taskid) def barrier(self, taskids): """Block until a set of tasks are completed. @@ -109,7 +123,7 @@ class BlockingTaskClient(object): taskids : list, tuple A sequence of taskids to block on. """ - return blockingCallFromThread(self.task_controller.barrier, taskids) + return self._bcft(self.task_controller.barrier, taskids) def spin(self): """ @@ -118,7 +132,7 @@ class BlockingTaskClient(object): This method only needs to be called in unusual situations where the scheduler is idle for some reason. """ - return blockingCallFromThread(self.task_controller.spin) + return self._bcft(self.task_controller.spin) def queue_status(self, verbose=False): """ @@ -132,7 +146,7 @@ class BlockingTaskClient(object): :Returns: A dict with the queue status. """ - return blockingCallFromThread(self.task_controller.queue_status, verbose) + return self._bcft(self.task_controller.queue_status, verbose) def clear(self): """ @@ -143,7 +157,7 @@ class BlockingTaskClient(object): tasks. Users should call this periodically to clean out these cached task results. """ - return blockingCallFromThread(self.task_controller.clear) + return self._bcft(self.task_controller.clear) def map(self, func, *sequences): """ diff --git a/IPython/kernel/tests/test_contexts.py b/IPython/kernel/tests/test_contexts.py deleted file mode 100644 index fe726c3..0000000 --- a/IPython/kernel/tests/test_contexts.py +++ /dev/null @@ -1,46 +0,0 @@ -# Tell nose to skip this module -__test__ = {} - -#from __future__ import with_statement - -# XXX This file is currently disabled to preserve 2.4 compatibility. - -#def test_simple(): -if 0: - - # XXX - for now, we need a running cluster to be started separately. The - # daemon work is almost finished, and will make much of this unnecessary. - from IPython.kernel import client - mec = client.MultiEngineClient(('127.0.0.1',10105)) - - try: - mec.get_ids() - except ConnectionRefusedError: - import os, time - os.system('ipcluster -n 2 &') - time.sleep(2) - mec = client.MultiEngineClient(('127.0.0.1',10105)) - - mec.block = False - - import itertools - c = itertools.count() - - parallel = RemoteMultiEngine(mec) - - mec.pushAll() - - ## with parallel as pr: - ## # A comment - ## remote() # this means the code below only runs remotely - ## print 'Hello remote world' - ## x = range(10) - ## # Comments are OK - ## # Even misindented. - ## y = x+1 - - - ## with pfor('i',sequence) as pr: - ## print x[i] - - print pr.x + pr.y diff --git a/IPython/kernel/tests/test_enginefc.py b/IPython/kernel/tests/test_enginefc.py index 502b421..1110652 100644 --- a/IPython/kernel/tests/test_enginefc.py +++ b/IPython/kernel/tests/test_enginefc.py @@ -53,8 +53,8 @@ class EngineFCTest(DeferredTestCase, # Start a server and append to self.servers self.controller_reference = FCRemoteEngineRefFromService(self) self.controller_tub = Tub() - self.controller_tub.listenOn('tcp:10105:interface=127.0.0.1') - self.controller_tub.setLocation('127.0.0.1:10105') + self.controller_tub.listenOn('tcp:10111:interface=127.0.0.1') + self.controller_tub.setLocation('127.0.0.1:10111') furl = self.controller_tub.registerReference(self.controller_reference) self.controller_tub.startService() diff --git a/IPython/kernel/tests/test_multienginefc.py b/IPython/kernel/tests/test_multienginefc.py index 340c617..205f320 100644 --- a/IPython/kernel/tests/test_multienginefc.py +++ b/IPython/kernel/tests/test_multienginefc.py @@ -27,7 +27,7 @@ from IPython.kernel.multiengine import IMultiEngine from IPython.kernel.tests.multienginetest import IFullSynchronousMultiEngineTestCase from IPython.kernel.multienginefc import IFCSynchronousMultiEngine from IPython.kernel import multiengine as me -from IPython.kernel.clientconnector import ClientConnector +from IPython.kernel.clientconnector import AsyncClientConnector from IPython.kernel.parallelfunction import ParallelFunction from IPython.kernel.error import CompositeError from IPython.kernel.util import printer @@ -40,37 +40,30 @@ def _raise_it(f): e.raise_exception() -class FullSynchronousMultiEngineTestCase(DeferredTestCase, IFullSynchronousMultiEngineTestCase): +class FullSynchronousMultiEngineTestCase( + DeferredTestCase, IFullSynchronousMultiEngineTestCase): - # XXX (fperez) this is awful: I'm fully disabling this entire test class. - # Right now it's blocking the tests from running at all, and I don't know - # how to fix it. I hope Brian can have a stab at it, but at least by doing - # this we can run the entire suite to completion. - # Once the problem is cleared, remove this skip method. - skip = True - # END XXX - def setUp(self): - + self.engines = [] - + self.controller = ControllerService() self.controller.startService() self.imultiengine = IMultiEngine(self.controller) self.mec_referenceable = IFCSynchronousMultiEngine(self.imultiengine) self.controller_tub = Tub() - self.controller_tub.listenOn('tcp:10105:interface=127.0.0.1') - self.controller_tub.setLocation('127.0.0.1:10105') - + self.controller_tub.listenOn('tcp:10111:interface=127.0.0.1') + self.controller_tub.setLocation('127.0.0.1:10111') + furl = self.controller_tub.registerReference(self.mec_referenceable) self.controller_tub.startService() - - self.client_tub = ClientConnector() - d = self.client_tub.get_multiengine_client(furl) + + self.client_tub = AsyncClientConnector() + d = self.client_tub.get_multiengine_client(furl_or_file=furl) d.addCallback(self.handle_got_client) return d - + def handle_got_client(self, client): self.multiengine = client @@ -95,7 +88,7 @@ class FullSynchronousMultiEngineTestCase(DeferredTestCase, IFullSynchronousMulti self.assertEquals(m.dist,'b') self.assertEquals(m.targets,'all') self.assertEquals(m.block,True) - + def test_map_default(self): self.addEngine(4) m = self.multiengine.mapper() @@ -104,7 +97,7 @@ class FullSynchronousMultiEngineTestCase(DeferredTestCase, IFullSynchronousMulti d.addCallback(lambda _: self.multiengine.map(lambda x: 2*x, range(10))) d.addCallback(lambda r: self.assertEquals(r,[2*x for x in range(10)])) return d - + def test_map_noblock(self): self.addEngine(4) m = self.multiengine.mapper(block=False) @@ -112,14 +105,14 @@ class FullSynchronousMultiEngineTestCase(DeferredTestCase, IFullSynchronousMulti d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True)) d.addCallback(lambda r: self.assertEquals(r,[2*x for x in range(10)])) return d - + def test_mapper_fail(self): self.addEngine(4) m = self.multiengine.mapper() d = m.map(lambda x: 1/0, range(10)) d.addBoth(lambda f: self.assertRaises(ZeroDivisionError, _raise_it, f)) return d - + def test_parallel(self): self.addEngine(4) p = self.multiengine.parallel() @@ -129,7 +122,7 @@ class FullSynchronousMultiEngineTestCase(DeferredTestCase, IFullSynchronousMulti d = f(range(10)) d.addCallback(lambda r: self.assertEquals(r,[2*x for x in range(10)])) return d - + def test_parallel_noblock(self): self.addEngine(1) p = self.multiengine.parallel(block=False) @@ -140,7 +133,7 @@ class FullSynchronousMultiEngineTestCase(DeferredTestCase, IFullSynchronousMulti d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True)) d.addCallback(lambda r: self.assertEquals(r,[2*x for x in range(10)])) return d - + def test_parallel_fail(self): self.addEngine(4) p = self.multiengine.parallel() @@ -150,3 +143,4 @@ class FullSynchronousMultiEngineTestCase(DeferredTestCase, IFullSynchronousMulti d = f(range(10)) d.addBoth(lambda f: self.assertRaises(ZeroDivisionError, _raise_it, f)) return d + diff --git a/IPython/kernel/tests/test_taskfc.py b/IPython/kernel/tests/test_taskfc.py index 589cf3e..79958c3 100644 --- a/IPython/kernel/tests/test_taskfc.py +++ b/IPython/kernel/tests/test_taskfc.py @@ -31,7 +31,7 @@ from IPython.kernel.multienginefc import IFCSynchronousMultiEngine from IPython.kernel.taskfc import IFCTaskController from IPython.kernel.util import printer from IPython.kernel.tests.tasktest import ITaskControllerTestCase -from IPython.kernel.clientconnector import ClientConnector +from IPython.kernel.clientconnector import AsyncClientConnector from IPython.kernel.error import CompositeError from IPython.kernel.parallelfunction import ParallelFunction @@ -48,42 +48,34 @@ def _raise_it(f): class TaskTest(DeferredTestCase, ITaskControllerTestCase): - # XXX (fperez) this is awful: I'm fully disabling this entire test class. - # Right now it's blocking the tests from running at all, and I don't know - # how to fix it. I hope Brian can have a stab at it, but at least by doing - # this we can run the entire suite to completion. - # Once the problem is cleared, remove this skip method. - skip = True - # END XXX - def setUp(self): - + self.engines = [] - + self.controller = cs.ControllerService() self.controller.startService() self.imultiengine = me.IMultiEngine(self.controller) self.itc = taskmodule.ITaskController(self.controller) self.itc.failurePenalty = 0 - + self.mec_referenceable = IFCSynchronousMultiEngine(self.imultiengine) self.tc_referenceable = IFCTaskController(self.itc) - + self.controller_tub = Tub() - self.controller_tub.listenOn('tcp:10105:interface=127.0.0.1') - self.controller_tub.setLocation('127.0.0.1:10105') - + self.controller_tub.listenOn('tcp:10111:interface=127.0.0.1') + self.controller_tub.setLocation('127.0.0.1:10111') + mec_furl = self.controller_tub.registerReference(self.mec_referenceable) tc_furl = self.controller_tub.registerReference(self.tc_referenceable) self.controller_tub.startService() - - self.client_tub = ClientConnector() - d = self.client_tub.get_multiengine_client(mec_furl) + + self.client_tub = AsyncClientConnector() + d = self.client_tub.get_multiengine_client(furl_or_file=mec_furl) d.addCallback(self.handle_mec_client) - d.addCallback(lambda _: self.client_tub.get_task_client(tc_furl)) + d.addCallback(lambda _: self.client_tub.get_task_client(furl_or_file=tc_furl)) d.addCallback(self.handle_tc_client) return d - + def handle_mec_client(self, client): self.multiengine = client @@ -103,7 +95,7 @@ class TaskTest(DeferredTestCase, ITaskControllerTestCase): d.addBoth(lambda _: self.controller.stopService()) dlist.append(d) return defer.DeferredList(dlist) - + def test_mapper(self): self.addEngine(1) m = self.tc.mapper() @@ -114,7 +106,7 @@ class TaskTest(DeferredTestCase, ITaskControllerTestCase): self.assertEquals(m.recovery_task,None) self.assertEquals(m.depend,None) self.assertEquals(m.block,True) - + def test_map_default(self): self.addEngine(1) m = self.tc.mapper() @@ -123,21 +115,21 @@ class TaskTest(DeferredTestCase, ITaskControllerTestCase): d.addCallback(lambda _: self.tc.map(lambda x: 2*x, range(10))) d.addCallback(lambda r: self.assertEquals(r,[2*x for x in range(10)])) return d - + def test_map_noblock(self): self.addEngine(1) m = self.tc.mapper(block=False) d = m.map(lambda x: 2*x, range(10)) d.addCallback(lambda r: self.assertEquals(r,[x for x in range(10)])) return d - + def test_mapper_fail(self): self.addEngine(1) m = self.tc.mapper() d = m.map(lambda x: 1/0, range(10)) d.addBoth(lambda f: self.assertRaises(ZeroDivisionError, _raise_it, f)) return d - + def test_parallel(self): self.addEngine(1) p = self.tc.parallel() @@ -147,7 +139,7 @@ class TaskTest(DeferredTestCase, ITaskControllerTestCase): d = f(range(10)) d.addCallback(lambda r: self.assertEquals(r,[2*x for x in range(10)])) return d - + def test_parallel_noblock(self): self.addEngine(1) p = self.tc.parallel(block=False) @@ -157,7 +149,7 @@ class TaskTest(DeferredTestCase, ITaskControllerTestCase): d = f(range(10)) d.addCallback(lambda r: self.assertEquals(r,[x for x in range(10)])) return d - + def test_parallel_fail(self): self.addEngine(1) p = self.tc.parallel() @@ -167,3 +159,4 @@ class TaskTest(DeferredTestCase, ITaskControllerTestCase): d = f(range(10)) d.addBoth(lambda f: self.assertRaises(ZeroDivisionError, _raise_it, f)) return d +