##// END OF EJS Templates
Fixed bugs in IPython.kernel....
Brian Granger -
Show More
@@ -1,6 +1,4 b''
1 #!/usr/bin/env python
2 1 # encoding: utf-8
3
4 2 """Facilities for handling client connections to the controller."""
5 3
6 4 #-----------------------------------------------------------------------------
@@ -20,6 +18,8 b' import os'
20 18 from IPython.kernel.fcutil import (
21 19 Tub,
22 20 find_furl,
21 is_valid_furl,
22 is_valid_furl_file,
23 23 is_valid_furl_or_file,
24 24 validate_furl_or_file,
25 25 FURLError
@@ -66,18 +66,30 b' class AsyncClientConnector(object):'
66 66 def _find_furl(self, profile='default', cluster_dir=None,
67 67 furl_or_file=None, furl_file_name=None,
68 68 ipython_dir=None):
69 """Find a FURL file by profile+ipython_dir or cluster dir.
69 """Find a FURL file.
70
71 If successful, this returns a FURL file that exists on the file
72 system. The contents of the file have not been checked though. This
73 is because we often have to deal with FURL file whose buffers have
74 not been flushed.
70 75
71 76 This raises an :exc:`~IPython.kernel.fcutil.FURLError` exception
72 77 if a FURL file can't be found.
78
79 This tries the following:
80
81 1. By the name ``furl_or_file``.
82 2. By ``cluster_dir`` and ``furl_file_name``.
83 3. By cluster profile with a default of ``default``. This uses
84 ``ipython_dir``.
73 85 """
74 86 # Try by furl_or_file
75 87 if furl_or_file is not None:
76 validate_furl_or_file(furl_or_file)
77 return furl_or_file
88 if is_valid_furl_or_file(furl_or_file):
89 return furl_or_file
78 90
79 91 if furl_file_name is None:
80 raise FURLError('A furl_file_name must be provided')
92 raise FURLError('A furl_file_name must be provided if furl_or_file is not')
81 93
82 94 # Try by cluster_dir
83 95 if cluster_dir is not None:
@@ -151,7 +163,7 b' class AsyncClientConnector(object):'
151 163 The full path to a cluster directory. This is useful if profiles
152 164 are not being used.
153 165 furl_or_file : str
154 A furl or a filename containing a FURLK. This is useful if you
166 A furl or a filename containing a FURL. This is useful if you
155 167 simply know the location of the FURL file.
156 168 ipython_dir : str
157 169 The location of the ipython_dir if different from the default.
@@ -193,7 +205,7 b' class AsyncClientConnector(object):'
193 205 The full path to a cluster directory. This is useful if profiles
194 206 are not being used.
195 207 furl_or_file : str
196 A furl or a filename containing a FURLK. This is useful if you
208 A furl or a filename containing a FURL. This is useful if you
197 209 simply know the location of the FURL file.
198 210 ipython_dir : str
199 211 The location of the ipython_dir if different from the default.
@@ -259,6 +271,9 b' class AsyncClientConnector(object):'
259 271 profile, cluster_dir, furl_or_file,
260 272 furl_file_name, ipython_dir
261 273 )
274 # If this succeeds, we know the furl file exists and has a .furl
275 # extension, but it could still be empty. That is checked each
276 # connection attempt.
262 277 except FURLError:
263 278 return defer.fail(failure.Failure())
264 279
@@ -349,7 +364,7 b' class ClientConnector(object):'
349 364 The full path to a cluster directory. This is useful if profiles
350 365 are not being used.
351 366 furl_or_file : str
352 A furl or a filename containing a FURLK. This is useful if you
367 A furl or a filename containing a FURL. This is useful if you
353 368 simply know the location of the FURL file.
354 369 ipython_dir : str
355 370 The location of the ipython_dir if different from the default.
@@ -390,7 +405,7 b' class ClientConnector(object):'
390 405 The full path to a cluster directory. This is useful if profiles
391 406 are not being used.
392 407 furl_or_file : str
393 A furl or a filename containing a FURLK. This is useful if you
408 A furl or a filename containing a FURL. This is useful if you
394 409 simply know the location of the FURL file.
395 410 ipython_dir : str
396 411 The location of the ipython_dir if different from the default.
@@ -23,16 +23,16 b' import tempfile'
23 23 from twisted.internet import reactor, defer
24 24 from twisted.python import log
25 25
26 import foolscap
26 27 from foolscap import Tub, UnauthenticatedTub
27 28
28 29 from IPython.config.loader import Config
29
30 30 from IPython.kernel.configobjfactory import AdaptedConfiguredObjectFactory
31
32 31 from IPython.kernel.error import SecurityError
33 32
34 from IPython.utils.traitlets import Int, Str, Bool, Instance
35 33 from IPython.utils.importstring import import_item
34 from IPython.utils.path import expand_path
35 from IPython.utils.traitlets import Int, Str, Bool, Instance
36 36
37 37 #-----------------------------------------------------------------------------
38 38 # Code
@@ -57,17 +57,17 b' class FURLError(Exception):'
57 57
58 58 def check_furl_file_security(furl_file, secure):
59 59 """Remove the old furl_file if changing security modes."""
60 furl_file = expand_path(furl_file)
60 61 if os.path.isfile(furl_file):
61 f = open(furl_file, 'r')
62 oldfurl = f.read().strip()
63 f.close()
62 with open(furl_file, 'r') as f:
63 oldfurl = f.read().strip()
64 64 if (oldfurl.startswith('pb://') and not secure) or (oldfurl.startswith('pbu://') and secure):
65 65 os.remove(furl_file)
66 66
67 67
68 68 def is_secure(furl):
69 69 """Is the given FURL secure or not."""
70 if is_valid(furl):
70 if is_valid_furl(furl):
71 71 if furl.startswith("pb://"):
72 72 return True
73 73 elif furl.startswith("pbu://"):
@@ -76,26 +76,45 b' def is_secure(furl):'
76 76 raise FURLError("invalid FURL: %s" % furl)
77 77
78 78
79 def is_valid(furl):
79 def is_valid_furl(furl):
80 80 """Is the str a valid FURL or not."""
81 81 if isinstance(furl, str):
82 82 if furl.startswith("pb://") or furl.startswith("pbu://"):
83 83 return True
84 else:
85 return False
84 86 else:
85 87 return False
86 88
87 89
90 def is_valid_furl_file(furl_or_file):
91 """See if furl_or_file exists and contains a valid FURL.
92
93 This doesn't try to read the contents because often we have to validate
94 FURL files that are created, but don't yet have a FURL written to them.
95 """
96 if isinstance(furl_or_file, (str, unicode)):
97 path, furl_filename = os.path.split(furl_or_file)
98 if os.path.isdir(path) and furl_filename.endswith('.furl'):
99 return True
100 return False
101
102
88 103 def find_furl(furl_or_file):
89 """Find, validate and return a FURL in a string or file."""
90 if isinstance(furl_or_file, str):
91 if is_valid(furl_or_file):
92 return furl_or_file
93 if os.path.isfile(furl_or_file):
104 """Find, validate and return a FURL in a string or file.
105
106 This calls :func:`IPython.utils.path.expand_path` on the argument to
107 properly handle ``~`` and ``$`` variables in the path.
108 """
109 if is_valid_furl(furl_or_file):
110 return furl_or_file
111 furl_or_file = expand_path(furl_or_file)
112 if is_valid_furl_file(furl_or_file):
94 113 with open(furl_or_file, 'r') as f:
95 114 furl = f.read().strip()
96 if is_valid(furl):
115 if is_valid_furl(furl):
97 116 return furl
98 raise FURLError("Not a valid FURL or FURL file: %s" % furl_or_file)
117 raise FURLError("Not a valid FURL or FURL file: %r" % furl_or_file)
99 118
100 119
101 120 def is_valid_furl_or_file(furl_or_file):
@@ -106,17 +125,14 b' def is_valid_furl_or_file(furl_or_file):'
106 125 if the FURL file exists or to read its contents. This is useful for
107 126 cases where auto re-connection is being used.
108 127 """
109 if isinstance(furl_or_file, str):
110 if is_valid(furl_or_file):
111 return True
112 if isinstance(furl_or_file, (str, unicode)):
113 path, furl_filename = os.path.split(furl_or_file)
114 if os.path.isdir(path) and furl_filename.endswith('.furl'):
115 return True
116 return False
128 if is_valid_furl(furl_or_file) or is_valid_furl_file(furl_or_file):
129 return True
130 else:
131 return False
117 132
118 133
119 134 def validate_furl_or_file(furl_or_file):
135 """Like :func:`is_valid_furl_or_file`, but raises an error."""
120 136 if not is_valid_furl_or_file(furl_or_file):
121 137 raise FURLError('Not a valid FURL or FURL file: %r' % furl_or_file)
122 138
@@ -17,12 +17,12 b' __docformat__ = "restructuredtext en"'
17 17 #-------------------------------------------------------------------------------
18 18
19 19 import sys
20 import linecache
21 20 import warnings
22 21
23 22 from twisted.python import components
24 23 from twisted.python.failure import Failure
25 24 from zope.interface import Interface, implements, Attribute
25 from foolscap import DeadReferenceError
26 26
27 27 from IPython.utils.coloransi import TermColors
28 28
@@ -306,85 +306,6 b' class InteractiveMultiEngineClient(object):'
306 306 def __len__(self):
307 307 """Return the number of available engines."""
308 308 return len(self.get_ids())
309
310 #---------------------------------------------------------------------------
311 # Make this a context manager for with
312 #---------------------------------------------------------------------------
313
314 def findsource_file(self,f):
315 linecache.checkcache()
316 s = findsource(f.f_code) # findsource is not defined!
317 lnum = f.f_lineno
318 wsource = s[0][f.f_lineno:]
319 return strip_whitespace(wsource)
320
321 def findsource_ipython(self,f):
322 from IPython.core import ipapi
323 self.ip = ipapi.get()
324 wsource = [l+'\n' for l in
325 self.ip.input_hist_raw[-1].splitlines()[1:]]
326 return strip_whitespace(wsource)
327
328 def __enter__(self):
329 f = sys._getframe(1)
330 local_ns = f.f_locals
331 global_ns = f.f_globals
332 if f.f_code.co_filename == '<ipython console>':
333 s = self.findsource_ipython(f)
334 else:
335 s = self.findsource_file(f)
336
337 self._with_context_result = self.execute(s)
338
339 def __exit__ (self, etype, value, tb):
340 if issubclass(etype,error.StopLocalExecution):
341 return True
342
343
344 def remote():
345 m = 'Special exception to stop local execution of parallel code.'
346 raise error.StopLocalExecution(m)
347
348 def strip_whitespace(source):
349 # Expand tabs to avoid any confusion.
350 wsource = [l.expandtabs(4) for l in source]
351 # Detect the indentation level
352 done = False
353 for line in wsource:
354 if line.isspace():
355 continue
356 for col,char in enumerate(line):
357 if char != ' ':
358 done = True
359 break
360 if done:
361 break
362 # Now we know how much leading space there is in the code. Next, we
363 # extract up to the first line that has less indentation.
364 # WARNINGS: we skip comments that may be misindented, but we do NOT yet
365 # detect triple quoted strings that may have flush left text.
366 for lno,line in enumerate(wsource):
367 lead = line[:col]
368 if lead.isspace():
369 continue
370 else:
371 if not lead.lstrip().startswith('#'):
372 break
373 # The real 'with' source is up to lno
374 src_lines = [l[col:] for l in wsource[:lno+1]]
375
376 # Finally, check that the source's first non-comment line begins with the
377 # special call 'remote()'
378 for nline,line in enumerate(src_lines):
379 if line.isspace() or line.startswith('#'):
380 continue
381 if 'remote()' in line:
382 break
383 else:
384 raise ValueError('remote() call missing at the start of code')
385 src = ''.join(src_lines[nline+1:])
386 #print 'SRC:\n<<<<<<<>>>>>>>\n%s<<<<<>>>>>>' % src # dbg
387 return src
388 309
389 310
390 311 #-------------------------------------------------------------------------------
@@ -444,18 +365,31 b' class FullBlockingMultiEngineClient(InteractiveMultiEngineClient):'
444 365
445 366 def _findTargetsAndBlock(self, targets=None, block=None):
446 367 return self._findTargets(targets), self._findBlock(block)
447
368
369 def _bcft(self, *args, **kwargs):
370 try:
371 result = blockingCallFromThread(*args, **kwargs)
372 except DeadReferenceError:
373 raise error.ConnectionError(
374 """A connection error has occurred in trying to connect to the
375 controller. This is usually caused by the controller dying or
376 being restarted. To resolve this issue try recreating the
377 multiengine client."""
378 )
379 else:
380 return result
381
448 382 def _blockFromThread(self, function, *args, **kwargs):
449 383 block = kwargs.get('block', None)
450 384 if block is None:
451 385 raise error.MissingBlockArgument("'block' keyword argument is missing")
452 result = blockingCallFromThread(function, *args, **kwargs)
386 result = self._bcft(function, *args, **kwargs)
453 387 if not block:
454 388 result = PendingResult(self, result)
455 389 return result
456 390
457 391 def get_pending_deferred(self, deferredID, block):
458 return blockingCallFromThread(self.smultiengine.get_pending_deferred, deferredID, block)
392 return self._bcft(self.smultiengine.get_pending_deferred, deferredID, block)
459 393
460 394 def barrier(self, pendingResults):
461 395 """Synchronize a set of `PendingResults`.
@@ -505,7 +439,7 b' class FullBlockingMultiEngineClient(InteractiveMultiEngineClient):'
505 439 controller. This method allows the user to clear out all un-retrieved
506 440 results on the controller.
507 441 """
508 r = blockingCallFromThread(self.smultiengine.clear_pending_deferreds)
442 r = self._bcft(self.smultiengine.clear_pending_deferreds)
509 443 return r
510 444
511 445 clear_pending_results = flush
@@ -529,7 +463,7 b' class FullBlockingMultiEngineClient(InteractiveMultiEngineClient):'
529 463 at a later time.
530 464 """
531 465 targets, block = self._findTargetsAndBlock(targets, block)
532 result = blockingCallFromThread(self.smultiengine.execute, lines,
466 result = self._bcft(self.smultiengine.execute, lines,
533 467 targets=targets, block=block)
534 468 if block:
535 469 result = ResultList(result)
@@ -647,7 +581,7 b' class FullBlockingMultiEngineClient(InteractiveMultiEngineClient):'
647 581 at a later time.
648 582 """
649 583 targets, block = self._findTargetsAndBlock(targets, block)
650 result = blockingCallFromThread(self.smultiengine.get_result, i, targets=targets, block=block)
584 result = self._bcft(self.smultiengine.get_result, i, targets=targets, block=block)
651 585 if block:
652 586 result = ResultList(result)
653 587 else:
@@ -773,7 +707,7 b' class FullBlockingMultiEngineClient(InteractiveMultiEngineClient):'
773 707 """
774 708 Returns the ids of currently registered engines.
775 709 """
776 result = blockingCallFromThread(self.smultiengine.get_ids)
710 result = self._bcft(self.smultiengine.get_ids)
777 711 return result
778 712
779 713 #---------------------------------------------------------------------------
@@ -20,9 +20,10 b' __docformat__ = "restructuredtext en"'
20 20
21 21 from zope.interface import Interface, implements
22 22 from twisted.python import components
23 from foolscap import DeadReferenceError
23 24
24 25 from IPython.kernel.twistedutil import blockingCallFromThread
25 from IPython.kernel import task
26 from IPython.kernel import task, error
26 27 from IPython.kernel.mapper import (
27 28 SynchronousTaskMapper,
28 29 ITaskMapperFactory,
@@ -58,7 +59,20 b' class BlockingTaskClient(object):'
58 59 def __init__(self, task_controller):
59 60 self.task_controller = task_controller
60 61 self.block = True
61
62
63 def _bcft(self, *args, **kwargs):
64 try:
65 result = blockingCallFromThread(*args, **kwargs)
66 except DeadReferenceError:
67 raise error.ConnectionError(
68 """A connection error has occurred in trying to connect to the
69 controller. This is usually caused by the controller dying or
70 being restarted. To resolve this issue try recreating the
71 task client."""
72 )
73 else:
74 return result
75
62 76 def run(self, task, block=False):
63 77 """Run a task on the `TaskController`.
64 78
@@ -71,7 +85,7 b' class BlockingTaskClient(object):'
71 85 :Returns: The int taskid of the submitted task. Pass this to
72 86 `get_task_result` to get the `TaskResult` object.
73 87 """
74 tid = blockingCallFromThread(self.task_controller.run, task)
88 tid = self._bcft(self.task_controller.run, task)
75 89 if block:
76 90 return self.get_task_result(tid, block=True)
77 91 else:
@@ -89,7 +103,7 b' class BlockingTaskClient(object):'
89 103
90 104 :Returns: A `TaskResult` object that encapsulates the task result.
91 105 """
92 return blockingCallFromThread(self.task_controller.get_task_result,
106 return self._bcft(self.task_controller.get_task_result,
93 107 taskid, block)
94 108
95 109 def abort(self, taskid):
@@ -100,7 +114,7 b' class BlockingTaskClient(object):'
100 114 taskid : int
101 115 The taskid of the task to be aborted.
102 116 """
103 return blockingCallFromThread(self.task_controller.abort, taskid)
117 return self._bcft(self.task_controller.abort, taskid)
104 118
105 119 def barrier(self, taskids):
106 120 """Block until a set of tasks are completed.
@@ -109,7 +123,7 b' class BlockingTaskClient(object):'
109 123 taskids : list, tuple
110 124 A sequence of taskids to block on.
111 125 """
112 return blockingCallFromThread(self.task_controller.barrier, taskids)
126 return self._bcft(self.task_controller.barrier, taskids)
113 127
114 128 def spin(self):
115 129 """
@@ -118,7 +132,7 b' class BlockingTaskClient(object):'
118 132 This method only needs to be called in unusual situations where the
119 133 scheduler is idle for some reason.
120 134 """
121 return blockingCallFromThread(self.task_controller.spin)
135 return self._bcft(self.task_controller.spin)
122 136
123 137 def queue_status(self, verbose=False):
124 138 """
@@ -132,7 +146,7 b' class BlockingTaskClient(object):'
132 146 :Returns:
133 147 A dict with the queue status.
134 148 """
135 return blockingCallFromThread(self.task_controller.queue_status, verbose)
149 return self._bcft(self.task_controller.queue_status, verbose)
136 150
137 151 def clear(self):
138 152 """
@@ -143,7 +157,7 b' class BlockingTaskClient(object):'
143 157 tasks. Users should call this periodically to clean out these
144 158 cached task results.
145 159 """
146 return blockingCallFromThread(self.task_controller.clear)
160 return self._bcft(self.task_controller.clear)
147 161
148 162 def map(self, func, *sequences):
149 163 """
@@ -53,8 +53,8 b' class EngineFCTest(DeferredTestCase,'
53 53 # Start a server and append to self.servers
54 54 self.controller_reference = FCRemoteEngineRefFromService(self)
55 55 self.controller_tub = Tub()
56 self.controller_tub.listenOn('tcp:10105:interface=127.0.0.1')
57 self.controller_tub.setLocation('127.0.0.1:10105')
56 self.controller_tub.listenOn('tcp:10111:interface=127.0.0.1')
57 self.controller_tub.setLocation('127.0.0.1:10111')
58 58
59 59 furl = self.controller_tub.registerReference(self.controller_reference)
60 60 self.controller_tub.startService()
@@ -27,7 +27,7 b' from IPython.kernel.multiengine import IMultiEngine'
27 27 from IPython.kernel.tests.multienginetest import IFullSynchronousMultiEngineTestCase
28 28 from IPython.kernel.multienginefc import IFCSynchronousMultiEngine
29 29 from IPython.kernel import multiengine as me
30 from IPython.kernel.clientconnector import ClientConnector
30 from IPython.kernel.clientconnector import AsyncClientConnector
31 31 from IPython.kernel.parallelfunction import ParallelFunction
32 32 from IPython.kernel.error import CompositeError
33 33 from IPython.kernel.util import printer
@@ -40,37 +40,30 b' def _raise_it(f):'
40 40 e.raise_exception()
41 41
42 42
43 class FullSynchronousMultiEngineTestCase(DeferredTestCase, IFullSynchronousMultiEngineTestCase):
43 class FullSynchronousMultiEngineTestCase(
44 DeferredTestCase, IFullSynchronousMultiEngineTestCase):
44 45
45 # XXX (fperez) this is awful: I'm fully disabling this entire test class.
46 # Right now it's blocking the tests from running at all, and I don't know
47 # how to fix it. I hope Brian can have a stab at it, but at least by doing
48 # this we can run the entire suite to completion.
49 # Once the problem is cleared, remove this skip method.
50 skip = True
51 # END XXX
52
53 46 def setUp(self):
54
47
55 48 self.engines = []
56
49
57 50 self.controller = ControllerService()
58 51 self.controller.startService()
59 52 self.imultiengine = IMultiEngine(self.controller)
60 53 self.mec_referenceable = IFCSynchronousMultiEngine(self.imultiengine)
61 54
62 55 self.controller_tub = Tub()
63 self.controller_tub.listenOn('tcp:10105:interface=127.0.0.1')
64 self.controller_tub.setLocation('127.0.0.1:10105')
65
56 self.controller_tub.listenOn('tcp:10111:interface=127.0.0.1')
57 self.controller_tub.setLocation('127.0.0.1:10111')
58
66 59 furl = self.controller_tub.registerReference(self.mec_referenceable)
67 60 self.controller_tub.startService()
68
69 self.client_tub = ClientConnector()
70 d = self.client_tub.get_multiengine_client(furl)
61
62 self.client_tub = AsyncClientConnector()
63 d = self.client_tub.get_multiengine_client(furl_or_file=furl)
71 64 d.addCallback(self.handle_got_client)
72 65 return d
73
66
74 67 def handle_got_client(self, client):
75 68 self.multiengine = client
76 69
@@ -95,7 +88,7 b' class FullSynchronousMultiEngineTestCase(DeferredTestCase, IFullSynchronousMulti'
95 88 self.assertEquals(m.dist,'b')
96 89 self.assertEquals(m.targets,'all')
97 90 self.assertEquals(m.block,True)
98
91
99 92 def test_map_default(self):
100 93 self.addEngine(4)
101 94 m = self.multiengine.mapper()
@@ -104,7 +97,7 b' class FullSynchronousMultiEngineTestCase(DeferredTestCase, IFullSynchronousMulti'
104 97 d.addCallback(lambda _: self.multiengine.map(lambda x: 2*x, range(10)))
105 98 d.addCallback(lambda r: self.assertEquals(r,[2*x for x in range(10)]))
106 99 return d
107
100
108 101 def test_map_noblock(self):
109 102 self.addEngine(4)
110 103 m = self.multiengine.mapper(block=False)
@@ -112,14 +105,14 b' class FullSynchronousMultiEngineTestCase(DeferredTestCase, IFullSynchronousMulti'
112 105 d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True))
113 106 d.addCallback(lambda r: self.assertEquals(r,[2*x for x in range(10)]))
114 107 return d
115
108
116 109 def test_mapper_fail(self):
117 110 self.addEngine(4)
118 111 m = self.multiengine.mapper()
119 112 d = m.map(lambda x: 1/0, range(10))
120 113 d.addBoth(lambda f: self.assertRaises(ZeroDivisionError, _raise_it, f))
121 114 return d
122
115
123 116 def test_parallel(self):
124 117 self.addEngine(4)
125 118 p = self.multiengine.parallel()
@@ -129,7 +122,7 b' class FullSynchronousMultiEngineTestCase(DeferredTestCase, IFullSynchronousMulti'
129 122 d = f(range(10))
130 123 d.addCallback(lambda r: self.assertEquals(r,[2*x for x in range(10)]))
131 124 return d
132
125
133 126 def test_parallel_noblock(self):
134 127 self.addEngine(1)
135 128 p = self.multiengine.parallel(block=False)
@@ -140,7 +133,7 b' class FullSynchronousMultiEngineTestCase(DeferredTestCase, IFullSynchronousMulti'
140 133 d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True))
141 134 d.addCallback(lambda r: self.assertEquals(r,[2*x for x in range(10)]))
142 135 return d
143
136
144 137 def test_parallel_fail(self):
145 138 self.addEngine(4)
146 139 p = self.multiengine.parallel()
@@ -150,3 +143,4 b' class FullSynchronousMultiEngineTestCase(DeferredTestCase, IFullSynchronousMulti'
150 143 d = f(range(10))
151 144 d.addBoth(lambda f: self.assertRaises(ZeroDivisionError, _raise_it, f))
152 145 return d
146
@@ -31,7 +31,7 b' from IPython.kernel.multienginefc import IFCSynchronousMultiEngine'
31 31 from IPython.kernel.taskfc import IFCTaskController
32 32 from IPython.kernel.util import printer
33 33 from IPython.kernel.tests.tasktest import ITaskControllerTestCase
34 from IPython.kernel.clientconnector import ClientConnector
34 from IPython.kernel.clientconnector import AsyncClientConnector
35 35 from IPython.kernel.error import CompositeError
36 36 from IPython.kernel.parallelfunction import ParallelFunction
37 37
@@ -48,42 +48,34 b' def _raise_it(f):'
48 48
49 49 class TaskTest(DeferredTestCase, ITaskControllerTestCase):
50 50
51 # XXX (fperez) this is awful: I'm fully disabling this entire test class.
52 # Right now it's blocking the tests from running at all, and I don't know
53 # how to fix it. I hope Brian can have a stab at it, but at least by doing
54 # this we can run the entire suite to completion.
55 # Once the problem is cleared, remove this skip method.
56 skip = True
57 # END XXX
58
59 51 def setUp(self):
60
52
61 53 self.engines = []
62
54
63 55 self.controller = cs.ControllerService()
64 56 self.controller.startService()
65 57 self.imultiengine = me.IMultiEngine(self.controller)
66 58 self.itc = taskmodule.ITaskController(self.controller)
67 59 self.itc.failurePenalty = 0
68
60
69 61 self.mec_referenceable = IFCSynchronousMultiEngine(self.imultiengine)
70 62 self.tc_referenceable = IFCTaskController(self.itc)
71
63
72 64 self.controller_tub = Tub()
73 self.controller_tub.listenOn('tcp:10105:interface=127.0.0.1')
74 self.controller_tub.setLocation('127.0.0.1:10105')
75
65 self.controller_tub.listenOn('tcp:10111:interface=127.0.0.1')
66 self.controller_tub.setLocation('127.0.0.1:10111')
67
76 68 mec_furl = self.controller_tub.registerReference(self.mec_referenceable)
77 69 tc_furl = self.controller_tub.registerReference(self.tc_referenceable)
78 70 self.controller_tub.startService()
79
80 self.client_tub = ClientConnector()
81 d = self.client_tub.get_multiengine_client(mec_furl)
71
72 self.client_tub = AsyncClientConnector()
73 d = self.client_tub.get_multiengine_client(furl_or_file=mec_furl)
82 74 d.addCallback(self.handle_mec_client)
83 d.addCallback(lambda _: self.client_tub.get_task_client(tc_furl))
75 d.addCallback(lambda _: self.client_tub.get_task_client(furl_or_file=tc_furl))
84 76 d.addCallback(self.handle_tc_client)
85 77 return d
86
78
87 79 def handle_mec_client(self, client):
88 80 self.multiengine = client
89 81
@@ -103,7 +95,7 b' class TaskTest(DeferredTestCase, ITaskControllerTestCase):'
103 95 d.addBoth(lambda _: self.controller.stopService())
104 96 dlist.append(d)
105 97 return defer.DeferredList(dlist)
106
98
107 99 def test_mapper(self):
108 100 self.addEngine(1)
109 101 m = self.tc.mapper()
@@ -114,7 +106,7 b' class TaskTest(DeferredTestCase, ITaskControllerTestCase):'
114 106 self.assertEquals(m.recovery_task,None)
115 107 self.assertEquals(m.depend,None)
116 108 self.assertEquals(m.block,True)
117
109
118 110 def test_map_default(self):
119 111 self.addEngine(1)
120 112 m = self.tc.mapper()
@@ -123,21 +115,21 b' class TaskTest(DeferredTestCase, ITaskControllerTestCase):'
123 115 d.addCallback(lambda _: self.tc.map(lambda x: 2*x, range(10)))
124 116 d.addCallback(lambda r: self.assertEquals(r,[2*x for x in range(10)]))
125 117 return d
126
118
127 119 def test_map_noblock(self):
128 120 self.addEngine(1)
129 121 m = self.tc.mapper(block=False)
130 122 d = m.map(lambda x: 2*x, range(10))
131 123 d.addCallback(lambda r: self.assertEquals(r,[x for x in range(10)]))
132 124 return d
133
125
134 126 def test_mapper_fail(self):
135 127 self.addEngine(1)
136 128 m = self.tc.mapper()
137 129 d = m.map(lambda x: 1/0, range(10))
138 130 d.addBoth(lambda f: self.assertRaises(ZeroDivisionError, _raise_it, f))
139 131 return d
140
132
141 133 def test_parallel(self):
142 134 self.addEngine(1)
143 135 p = self.tc.parallel()
@@ -147,7 +139,7 b' class TaskTest(DeferredTestCase, ITaskControllerTestCase):'
147 139 d = f(range(10))
148 140 d.addCallback(lambda r: self.assertEquals(r,[2*x for x in range(10)]))
149 141 return d
150
142
151 143 def test_parallel_noblock(self):
152 144 self.addEngine(1)
153 145 p = self.tc.parallel(block=False)
@@ -157,7 +149,7 b' class TaskTest(DeferredTestCase, ITaskControllerTestCase):'
157 149 d = f(range(10))
158 150 d.addCallback(lambda r: self.assertEquals(r,[x for x in range(10)]))
159 151 return d
160
152
161 153 def test_parallel_fail(self):
162 154 self.addEngine(1)
163 155 p = self.tc.parallel()
@@ -167,3 +159,4 b' class TaskTest(DeferredTestCase, ITaskControllerTestCase):'
167 159 d = f(range(10))
168 160 d.addBoth(lambda f: self.assertRaises(ZeroDivisionError, _raise_it, f))
169 161 return d
162
1 NO CONTENT: file was removed
1 NO CONTENT: file was removed
General Comments 0
You need to be logged in to leave comments. Login now