##// END OF EJS Templates
Fixed bugs in IPython.kernel....
Brian Granger -
Show More
@@ -1,6 +1,4 b''
1 #!/usr/bin/env python
2 # encoding: utf-8
1 # encoding: utf-8
3
4 """Facilities for handling client connections to the controller."""
2 """Facilities for handling client connections to the controller."""
5
3
6 #-----------------------------------------------------------------------------
4 #-----------------------------------------------------------------------------
@@ -20,6 +18,8 b' import os'
20 from IPython.kernel.fcutil import (
18 from IPython.kernel.fcutil import (
21 Tub,
19 Tub,
22 find_furl,
20 find_furl,
21 is_valid_furl,
22 is_valid_furl_file,
23 is_valid_furl_or_file,
23 is_valid_furl_or_file,
24 validate_furl_or_file,
24 validate_furl_or_file,
25 FURLError
25 FURLError
@@ -66,18 +66,30 b' class AsyncClientConnector(object):'
66 def _find_furl(self, profile='default', cluster_dir=None,
66 def _find_furl(self, profile='default', cluster_dir=None,
67 furl_or_file=None, furl_file_name=None,
67 furl_or_file=None, furl_file_name=None,
68 ipython_dir=None):
68 ipython_dir=None):
69 """Find a FURL file by profile+ipython_dir or cluster dir.
69 """Find a FURL file.
70
71 If successful, this returns a FURL file that exists on the file
72 system. The contents of the file have not been checked though. This
73 is because we often have to deal with FURL file whose buffers have
74 not been flushed.
70
75
71 This raises an :exc:`~IPython.kernel.fcutil.FURLError` exception
76 This raises an :exc:`~IPython.kernel.fcutil.FURLError` exception
72 if a FURL file can't be found.
77 if a FURL file can't be found.
78
79 This tries the following:
80
81 1. By the name ``furl_or_file``.
82 2. By ``cluster_dir`` and ``furl_file_name``.
83 3. By cluster profile with a default of ``default``. This uses
84 ``ipython_dir``.
73 """
85 """
74 # Try by furl_or_file
86 # Try by furl_or_file
75 if furl_or_file is not None:
87 if furl_or_file is not None:
76 validate_furl_or_file(furl_or_file)
88 if is_valid_furl_or_file(furl_or_file):
77 return furl_or_file
89 return furl_or_file
78
90
79 if furl_file_name is None:
91 if furl_file_name is None:
80 raise FURLError('A furl_file_name must be provided')
92 raise FURLError('A furl_file_name must be provided if furl_or_file is not')
81
93
82 # Try by cluster_dir
94 # Try by cluster_dir
83 if cluster_dir is not None:
95 if cluster_dir is not None:
@@ -151,7 +163,7 b' class AsyncClientConnector(object):'
151 The full path to a cluster directory. This is useful if profiles
163 The full path to a cluster directory. This is useful if profiles
152 are not being used.
164 are not being used.
153 furl_or_file : str
165 furl_or_file : str
154 A furl or a filename containing a FURLK. This is useful if you
166 A furl or a filename containing a FURL. This is useful if you
155 simply know the location of the FURL file.
167 simply know the location of the FURL file.
156 ipython_dir : str
168 ipython_dir : str
157 The location of the ipython_dir if different from the default.
169 The location of the ipython_dir if different from the default.
@@ -193,7 +205,7 b' class AsyncClientConnector(object):'
193 The full path to a cluster directory. This is useful if profiles
205 The full path to a cluster directory. This is useful if profiles
194 are not being used.
206 are not being used.
195 furl_or_file : str
207 furl_or_file : str
196 A furl or a filename containing a FURLK. This is useful if you
208 A furl or a filename containing a FURL. This is useful if you
197 simply know the location of the FURL file.
209 simply know the location of the FURL file.
198 ipython_dir : str
210 ipython_dir : str
199 The location of the ipython_dir if different from the default.
211 The location of the ipython_dir if different from the default.
@@ -259,6 +271,9 b' class AsyncClientConnector(object):'
259 profile, cluster_dir, furl_or_file,
271 profile, cluster_dir, furl_or_file,
260 furl_file_name, ipython_dir
272 furl_file_name, ipython_dir
261 )
273 )
274 # If this succeeds, we know the furl file exists and has a .furl
275 # extension, but it could still be empty. That is checked each
276 # connection attempt.
262 except FURLError:
277 except FURLError:
263 return defer.fail(failure.Failure())
278 return defer.fail(failure.Failure())
264
279
@@ -349,7 +364,7 b' class ClientConnector(object):'
349 The full path to a cluster directory. This is useful if profiles
364 The full path to a cluster directory. This is useful if profiles
350 are not being used.
365 are not being used.
351 furl_or_file : str
366 furl_or_file : str
352 A furl or a filename containing a FURLK. This is useful if you
367 A furl or a filename containing a FURL. This is useful if you
353 simply know the location of the FURL file.
368 simply know the location of the FURL file.
354 ipython_dir : str
369 ipython_dir : str
355 The location of the ipython_dir if different from the default.
370 The location of the ipython_dir if different from the default.
@@ -390,7 +405,7 b' class ClientConnector(object):'
390 The full path to a cluster directory. This is useful if profiles
405 The full path to a cluster directory. This is useful if profiles
391 are not being used.
406 are not being used.
392 furl_or_file : str
407 furl_or_file : str
393 A furl or a filename containing a FURLK. This is useful if you
408 A furl or a filename containing a FURL. This is useful if you
394 simply know the location of the FURL file.
409 simply know the location of the FURL file.
395 ipython_dir : str
410 ipython_dir : str
396 The location of the ipython_dir if different from the default.
411 The location of the ipython_dir if different from the default.
@@ -23,16 +23,16 b' import tempfile'
23 from twisted.internet import reactor, defer
23 from twisted.internet import reactor, defer
24 from twisted.python import log
24 from twisted.python import log
25
25
26 import foolscap
26 from foolscap import Tub, UnauthenticatedTub
27 from foolscap import Tub, UnauthenticatedTub
27
28
28 from IPython.config.loader import Config
29 from IPython.config.loader import Config
29
30 from IPython.kernel.configobjfactory import AdaptedConfiguredObjectFactory
30 from IPython.kernel.configobjfactory import AdaptedConfiguredObjectFactory
31
32 from IPython.kernel.error import SecurityError
31 from IPython.kernel.error import SecurityError
33
32
34 from IPython.utils.traitlets import Int, Str, Bool, Instance
35 from IPython.utils.importstring import import_item
33 from IPython.utils.importstring import import_item
34 from IPython.utils.path import expand_path
35 from IPython.utils.traitlets import Int, Str, Bool, Instance
36
36
37 #-----------------------------------------------------------------------------
37 #-----------------------------------------------------------------------------
38 # Code
38 # Code
@@ -57,17 +57,17 b' class FURLError(Exception):'
57
57
58 def check_furl_file_security(furl_file, secure):
58 def check_furl_file_security(furl_file, secure):
59 """Remove the old furl_file if changing security modes."""
59 """Remove the old furl_file if changing security modes."""
60 furl_file = expand_path(furl_file)
60 if os.path.isfile(furl_file):
61 if os.path.isfile(furl_file):
61 f = open(furl_file, 'r')
62 with open(furl_file, 'r') as f:
62 oldfurl = f.read().strip()
63 oldfurl = f.read().strip()
63 f.close()
64 if (oldfurl.startswith('pb://') and not secure) or (oldfurl.startswith('pbu://') and secure):
64 if (oldfurl.startswith('pb://') and not secure) or (oldfurl.startswith('pbu://') and secure):
65 os.remove(furl_file)
65 os.remove(furl_file)
66
66
67
67
68 def is_secure(furl):
68 def is_secure(furl):
69 """Is the given FURL secure or not."""
69 """Is the given FURL secure or not."""
70 if is_valid(furl):
70 if is_valid_furl(furl):
71 if furl.startswith("pb://"):
71 if furl.startswith("pb://"):
72 return True
72 return True
73 elif furl.startswith("pbu://"):
73 elif furl.startswith("pbu://"):
@@ -76,26 +76,45 b' def is_secure(furl):'
76 raise FURLError("invalid FURL: %s" % furl)
76 raise FURLError("invalid FURL: %s" % furl)
77
77
78
78
79 def is_valid(furl):
79 def is_valid_furl(furl):
80 """Is the str a valid FURL or not."""
80 """Is the str a valid FURL or not."""
81 if isinstance(furl, str):
81 if isinstance(furl, str):
82 if furl.startswith("pb://") or furl.startswith("pbu://"):
82 if furl.startswith("pb://") or furl.startswith("pbu://"):
83 return True
83 return True
84 else:
85 return False
84 else:
86 else:
85 return False
87 return False
86
88
87
89
90 def is_valid_furl_file(furl_or_file):
91 """See if furl_or_file exists and contains a valid FURL.
92
93 This doesn't try to read the contents because often we have to validate
94 FURL files that are created, but don't yet have a FURL written to them.
95 """
96 if isinstance(furl_or_file, (str, unicode)):
97 path, furl_filename = os.path.split(furl_or_file)
98 if os.path.isdir(path) and furl_filename.endswith('.furl'):
99 return True
100 return False
101
102
88 def find_furl(furl_or_file):
103 def find_furl(furl_or_file):
89 """Find, validate and return a FURL in a string or file."""
104 """Find, validate and return a FURL in a string or file.
90 if isinstance(furl_or_file, str):
105
91 if is_valid(furl_or_file):
106 This calls :func:`IPython.utils.path.expand_path` on the argument to
92 return furl_or_file
107 properly handle ``~`` and ``$`` variables in the path.
93 if os.path.isfile(furl_or_file):
108 """
109 if is_valid_furl(furl_or_file):
110 return furl_or_file
111 furl_or_file = expand_path(furl_or_file)
112 if is_valid_furl_file(furl_or_file):
94 with open(furl_or_file, 'r') as f:
113 with open(furl_or_file, 'r') as f:
95 furl = f.read().strip()
114 furl = f.read().strip()
96 if is_valid(furl):
115 if is_valid_furl(furl):
97 return furl
116 return furl
98 raise FURLError("Not a valid FURL or FURL file: %s" % furl_or_file)
117 raise FURLError("Not a valid FURL or FURL file: %r" % furl_or_file)
99
118
100
119
101 def is_valid_furl_or_file(furl_or_file):
120 def is_valid_furl_or_file(furl_or_file):
@@ -106,17 +125,14 b' def is_valid_furl_or_file(furl_or_file):'
106 if the FURL file exists or to read its contents. This is useful for
125 if the FURL file exists or to read its contents. This is useful for
107 cases where auto re-connection is being used.
126 cases where auto re-connection is being used.
108 """
127 """
109 if isinstance(furl_or_file, str):
128 if is_valid_furl(furl_or_file) or is_valid_furl_file(furl_or_file):
110 if is_valid(furl_or_file):
129 return True
111 return True
130 else:
112 if isinstance(furl_or_file, (str, unicode)):
131 return False
113 path, furl_filename = os.path.split(furl_or_file)
114 if os.path.isdir(path) and furl_filename.endswith('.furl'):
115 return True
116 return False
117
132
118
133
119 def validate_furl_or_file(furl_or_file):
134 def validate_furl_or_file(furl_or_file):
135 """Like :func:`is_valid_furl_or_file`, but raises an error."""
120 if not is_valid_furl_or_file(furl_or_file):
136 if not is_valid_furl_or_file(furl_or_file):
121 raise FURLError('Not a valid FURL or FURL file: %r' % furl_or_file)
137 raise FURLError('Not a valid FURL or FURL file: %r' % furl_or_file)
122
138
@@ -17,12 +17,12 b' __docformat__ = "restructuredtext en"'
17 #-------------------------------------------------------------------------------
17 #-------------------------------------------------------------------------------
18
18
19 import sys
19 import sys
20 import linecache
21 import warnings
20 import warnings
22
21
23 from twisted.python import components
22 from twisted.python import components
24 from twisted.python.failure import Failure
23 from twisted.python.failure import Failure
25 from zope.interface import Interface, implements, Attribute
24 from zope.interface import Interface, implements, Attribute
25 from foolscap import DeadReferenceError
26
26
27 from IPython.utils.coloransi import TermColors
27 from IPython.utils.coloransi import TermColors
28
28
@@ -306,85 +306,6 b' class InteractiveMultiEngineClient(object):'
306 def __len__(self):
306 def __len__(self):
307 """Return the number of available engines."""
307 """Return the number of available engines."""
308 return len(self.get_ids())
308 return len(self.get_ids())
309
310 #---------------------------------------------------------------------------
311 # Make this a context manager for with
312 #---------------------------------------------------------------------------
313
314 def findsource_file(self,f):
315 linecache.checkcache()
316 s = findsource(f.f_code) # findsource is not defined!
317 lnum = f.f_lineno
318 wsource = s[0][f.f_lineno:]
319 return strip_whitespace(wsource)
320
321 def findsource_ipython(self,f):
322 from IPython.core import ipapi
323 self.ip = ipapi.get()
324 wsource = [l+'\n' for l in
325 self.ip.input_hist_raw[-1].splitlines()[1:]]
326 return strip_whitespace(wsource)
327
328 def __enter__(self):
329 f = sys._getframe(1)
330 local_ns = f.f_locals
331 global_ns = f.f_globals
332 if f.f_code.co_filename == '<ipython console>':
333 s = self.findsource_ipython(f)
334 else:
335 s = self.findsource_file(f)
336
337 self._with_context_result = self.execute(s)
338
339 def __exit__ (self, etype, value, tb):
340 if issubclass(etype,error.StopLocalExecution):
341 return True
342
343
344 def remote():
345 m = 'Special exception to stop local execution of parallel code.'
346 raise error.StopLocalExecution(m)
347
348 def strip_whitespace(source):
349 # Expand tabs to avoid any confusion.
350 wsource = [l.expandtabs(4) for l in source]
351 # Detect the indentation level
352 done = False
353 for line in wsource:
354 if line.isspace():
355 continue
356 for col,char in enumerate(line):
357 if char != ' ':
358 done = True
359 break
360 if done:
361 break
362 # Now we know how much leading space there is in the code. Next, we
363 # extract up to the first line that has less indentation.
364 # WARNINGS: we skip comments that may be misindented, but we do NOT yet
365 # detect triple quoted strings that may have flush left text.
366 for lno,line in enumerate(wsource):
367 lead = line[:col]
368 if lead.isspace():
369 continue
370 else:
371 if not lead.lstrip().startswith('#'):
372 break
373 # The real 'with' source is up to lno
374 src_lines = [l[col:] for l in wsource[:lno+1]]
375
376 # Finally, check that the source's first non-comment line begins with the
377 # special call 'remote()'
378 for nline,line in enumerate(src_lines):
379 if line.isspace() or line.startswith('#'):
380 continue
381 if 'remote()' in line:
382 break
383 else:
384 raise ValueError('remote() call missing at the start of code')
385 src = ''.join(src_lines[nline+1:])
386 #print 'SRC:\n<<<<<<<>>>>>>>\n%s<<<<<>>>>>>' % src # dbg
387 return src
388
309
389
310
390 #-------------------------------------------------------------------------------
311 #-------------------------------------------------------------------------------
@@ -444,18 +365,31 b' class FullBlockingMultiEngineClient(InteractiveMultiEngineClient):'
444
365
445 def _findTargetsAndBlock(self, targets=None, block=None):
366 def _findTargetsAndBlock(self, targets=None, block=None):
446 return self._findTargets(targets), self._findBlock(block)
367 return self._findTargets(targets), self._findBlock(block)
447
368
369 def _bcft(self, *args, **kwargs):
370 try:
371 result = blockingCallFromThread(*args, **kwargs)
372 except DeadReferenceError:
373 raise error.ConnectionError(
374 """A connection error has occurred in trying to connect to the
375 controller. This is usually caused by the controller dying or
376 being restarted. To resolve this issue try recreating the
377 multiengine client."""
378 )
379 else:
380 return result
381
448 def _blockFromThread(self, function, *args, **kwargs):
382 def _blockFromThread(self, function, *args, **kwargs):
449 block = kwargs.get('block', None)
383 block = kwargs.get('block', None)
450 if block is None:
384 if block is None:
451 raise error.MissingBlockArgument("'block' keyword argument is missing")
385 raise error.MissingBlockArgument("'block' keyword argument is missing")
452 result = blockingCallFromThread(function, *args, **kwargs)
386 result = self._bcft(function, *args, **kwargs)
453 if not block:
387 if not block:
454 result = PendingResult(self, result)
388 result = PendingResult(self, result)
455 return result
389 return result
456
390
457 def get_pending_deferred(self, deferredID, block):
391 def get_pending_deferred(self, deferredID, block):
458 return blockingCallFromThread(self.smultiengine.get_pending_deferred, deferredID, block)
392 return self._bcft(self.smultiengine.get_pending_deferred, deferredID, block)
459
393
460 def barrier(self, pendingResults):
394 def barrier(self, pendingResults):
461 """Synchronize a set of `PendingResults`.
395 """Synchronize a set of `PendingResults`.
@@ -505,7 +439,7 b' class FullBlockingMultiEngineClient(InteractiveMultiEngineClient):'
505 controller. This method allows the user to clear out all un-retrieved
439 controller. This method allows the user to clear out all un-retrieved
506 results on the controller.
440 results on the controller.
507 """
441 """
508 r = blockingCallFromThread(self.smultiengine.clear_pending_deferreds)
442 r = self._bcft(self.smultiengine.clear_pending_deferreds)
509 return r
443 return r
510
444
511 clear_pending_results = flush
445 clear_pending_results = flush
@@ -529,7 +463,7 b' class FullBlockingMultiEngineClient(InteractiveMultiEngineClient):'
529 at a later time.
463 at a later time.
530 """
464 """
531 targets, block = self._findTargetsAndBlock(targets, block)
465 targets, block = self._findTargetsAndBlock(targets, block)
532 result = blockingCallFromThread(self.smultiengine.execute, lines,
466 result = self._bcft(self.smultiengine.execute, lines,
533 targets=targets, block=block)
467 targets=targets, block=block)
534 if block:
468 if block:
535 result = ResultList(result)
469 result = ResultList(result)
@@ -647,7 +581,7 b' class FullBlockingMultiEngineClient(InteractiveMultiEngineClient):'
647 at a later time.
581 at a later time.
648 """
582 """
649 targets, block = self._findTargetsAndBlock(targets, block)
583 targets, block = self._findTargetsAndBlock(targets, block)
650 result = blockingCallFromThread(self.smultiengine.get_result, i, targets=targets, block=block)
584 result = self._bcft(self.smultiengine.get_result, i, targets=targets, block=block)
651 if block:
585 if block:
652 result = ResultList(result)
586 result = ResultList(result)
653 else:
587 else:
@@ -773,7 +707,7 b' class FullBlockingMultiEngineClient(InteractiveMultiEngineClient):'
773 """
707 """
774 Returns the ids of currently registered engines.
708 Returns the ids of currently registered engines.
775 """
709 """
776 result = blockingCallFromThread(self.smultiengine.get_ids)
710 result = self._bcft(self.smultiengine.get_ids)
777 return result
711 return result
778
712
779 #---------------------------------------------------------------------------
713 #---------------------------------------------------------------------------
@@ -20,9 +20,10 b' __docformat__ = "restructuredtext en"'
20
20
21 from zope.interface import Interface, implements
21 from zope.interface import Interface, implements
22 from twisted.python import components
22 from twisted.python import components
23 from foolscap import DeadReferenceError
23
24
24 from IPython.kernel.twistedutil import blockingCallFromThread
25 from IPython.kernel.twistedutil import blockingCallFromThread
25 from IPython.kernel import task
26 from IPython.kernel import task, error
26 from IPython.kernel.mapper import (
27 from IPython.kernel.mapper import (
27 SynchronousTaskMapper,
28 SynchronousTaskMapper,
28 ITaskMapperFactory,
29 ITaskMapperFactory,
@@ -58,7 +59,20 b' class BlockingTaskClient(object):'
58 def __init__(self, task_controller):
59 def __init__(self, task_controller):
59 self.task_controller = task_controller
60 self.task_controller = task_controller
60 self.block = True
61 self.block = True
61
62
63 def _bcft(self, *args, **kwargs):
64 try:
65 result = blockingCallFromThread(*args, **kwargs)
66 except DeadReferenceError:
67 raise error.ConnectionError(
68 """A connection error has occurred in trying to connect to the
69 controller. This is usually caused by the controller dying or
70 being restarted. To resolve this issue try recreating the
71 task client."""
72 )
73 else:
74 return result
75
62 def run(self, task, block=False):
76 def run(self, task, block=False):
63 """Run a task on the `TaskController`.
77 """Run a task on the `TaskController`.
64
78
@@ -71,7 +85,7 b' class BlockingTaskClient(object):'
71 :Returns: The int taskid of the submitted task. Pass this to
85 :Returns: The int taskid of the submitted task. Pass this to
72 `get_task_result` to get the `TaskResult` object.
86 `get_task_result` to get the `TaskResult` object.
73 """
87 """
74 tid = blockingCallFromThread(self.task_controller.run, task)
88 tid = self._bcft(self.task_controller.run, task)
75 if block:
89 if block:
76 return self.get_task_result(tid, block=True)
90 return self.get_task_result(tid, block=True)
77 else:
91 else:
@@ -89,7 +103,7 b' class BlockingTaskClient(object):'
89
103
90 :Returns: A `TaskResult` object that encapsulates the task result.
104 :Returns: A `TaskResult` object that encapsulates the task result.
91 """
105 """
92 return blockingCallFromThread(self.task_controller.get_task_result,
106 return self._bcft(self.task_controller.get_task_result,
93 taskid, block)
107 taskid, block)
94
108
95 def abort(self, taskid):
109 def abort(self, taskid):
@@ -100,7 +114,7 b' class BlockingTaskClient(object):'
100 taskid : int
114 taskid : int
101 The taskid of the task to be aborted.
115 The taskid of the task to be aborted.
102 """
116 """
103 return blockingCallFromThread(self.task_controller.abort, taskid)
117 return self._bcft(self.task_controller.abort, taskid)
104
118
105 def barrier(self, taskids):
119 def barrier(self, taskids):
106 """Block until a set of tasks are completed.
120 """Block until a set of tasks are completed.
@@ -109,7 +123,7 b' class BlockingTaskClient(object):'
109 taskids : list, tuple
123 taskids : list, tuple
110 A sequence of taskids to block on.
124 A sequence of taskids to block on.
111 """
125 """
112 return blockingCallFromThread(self.task_controller.barrier, taskids)
126 return self._bcft(self.task_controller.barrier, taskids)
113
127
114 def spin(self):
128 def spin(self):
115 """
129 """
@@ -118,7 +132,7 b' class BlockingTaskClient(object):'
118 This method only needs to be called in unusual situations where the
132 This method only needs to be called in unusual situations where the
119 scheduler is idle for some reason.
133 scheduler is idle for some reason.
120 """
134 """
121 return blockingCallFromThread(self.task_controller.spin)
135 return self._bcft(self.task_controller.spin)
122
136
123 def queue_status(self, verbose=False):
137 def queue_status(self, verbose=False):
124 """
138 """
@@ -132,7 +146,7 b' class BlockingTaskClient(object):'
132 :Returns:
146 :Returns:
133 A dict with the queue status.
147 A dict with the queue status.
134 """
148 """
135 return blockingCallFromThread(self.task_controller.queue_status, verbose)
149 return self._bcft(self.task_controller.queue_status, verbose)
136
150
137 def clear(self):
151 def clear(self):
138 """
152 """
@@ -143,7 +157,7 b' class BlockingTaskClient(object):'
143 tasks. Users should call this periodically to clean out these
157 tasks. Users should call this periodically to clean out these
144 cached task results.
158 cached task results.
145 """
159 """
146 return blockingCallFromThread(self.task_controller.clear)
160 return self._bcft(self.task_controller.clear)
147
161
148 def map(self, func, *sequences):
162 def map(self, func, *sequences):
149 """
163 """
@@ -53,8 +53,8 b' class EngineFCTest(DeferredTestCase,'
53 # Start a server and append to self.servers
53 # Start a server and append to self.servers
54 self.controller_reference = FCRemoteEngineRefFromService(self)
54 self.controller_reference = FCRemoteEngineRefFromService(self)
55 self.controller_tub = Tub()
55 self.controller_tub = Tub()
56 self.controller_tub.listenOn('tcp:10105:interface=127.0.0.1')
56 self.controller_tub.listenOn('tcp:10111:interface=127.0.0.1')
57 self.controller_tub.setLocation('127.0.0.1:10105')
57 self.controller_tub.setLocation('127.0.0.1:10111')
58
58
59 furl = self.controller_tub.registerReference(self.controller_reference)
59 furl = self.controller_tub.registerReference(self.controller_reference)
60 self.controller_tub.startService()
60 self.controller_tub.startService()
@@ -27,7 +27,7 b' from IPython.kernel.multiengine import IMultiEngine'
27 from IPython.kernel.tests.multienginetest import IFullSynchronousMultiEngineTestCase
27 from IPython.kernel.tests.multienginetest import IFullSynchronousMultiEngineTestCase
28 from IPython.kernel.multienginefc import IFCSynchronousMultiEngine
28 from IPython.kernel.multienginefc import IFCSynchronousMultiEngine
29 from IPython.kernel import multiengine as me
29 from IPython.kernel import multiengine as me
30 from IPython.kernel.clientconnector import ClientConnector
30 from IPython.kernel.clientconnector import AsyncClientConnector
31 from IPython.kernel.parallelfunction import ParallelFunction
31 from IPython.kernel.parallelfunction import ParallelFunction
32 from IPython.kernel.error import CompositeError
32 from IPython.kernel.error import CompositeError
33 from IPython.kernel.util import printer
33 from IPython.kernel.util import printer
@@ -40,37 +40,30 b' def _raise_it(f):'
40 e.raise_exception()
40 e.raise_exception()
41
41
42
42
43 class FullSynchronousMultiEngineTestCase(DeferredTestCase, IFullSynchronousMultiEngineTestCase):
43 class FullSynchronousMultiEngineTestCase(
44 DeferredTestCase, IFullSynchronousMultiEngineTestCase):
44
45
45 # XXX (fperez) this is awful: I'm fully disabling this entire test class.
46 # Right now it's blocking the tests from running at all, and I don't know
47 # how to fix it. I hope Brian can have a stab at it, but at least by doing
48 # this we can run the entire suite to completion.
49 # Once the problem is cleared, remove this skip method.
50 skip = True
51 # END XXX
52
53 def setUp(self):
46 def setUp(self):
54
47
55 self.engines = []
48 self.engines = []
56
49
57 self.controller = ControllerService()
50 self.controller = ControllerService()
58 self.controller.startService()
51 self.controller.startService()
59 self.imultiengine = IMultiEngine(self.controller)
52 self.imultiengine = IMultiEngine(self.controller)
60 self.mec_referenceable = IFCSynchronousMultiEngine(self.imultiengine)
53 self.mec_referenceable = IFCSynchronousMultiEngine(self.imultiengine)
61
54
62 self.controller_tub = Tub()
55 self.controller_tub = Tub()
63 self.controller_tub.listenOn('tcp:10105:interface=127.0.0.1')
56 self.controller_tub.listenOn('tcp:10111:interface=127.0.0.1')
64 self.controller_tub.setLocation('127.0.0.1:10105')
57 self.controller_tub.setLocation('127.0.0.1:10111')
65
58
66 furl = self.controller_tub.registerReference(self.mec_referenceable)
59 furl = self.controller_tub.registerReference(self.mec_referenceable)
67 self.controller_tub.startService()
60 self.controller_tub.startService()
68
61
69 self.client_tub = ClientConnector()
62 self.client_tub = AsyncClientConnector()
70 d = self.client_tub.get_multiengine_client(furl)
63 d = self.client_tub.get_multiengine_client(furl_or_file=furl)
71 d.addCallback(self.handle_got_client)
64 d.addCallback(self.handle_got_client)
72 return d
65 return d
73
66
74 def handle_got_client(self, client):
67 def handle_got_client(self, client):
75 self.multiengine = client
68 self.multiengine = client
76
69
@@ -95,7 +88,7 b' class FullSynchronousMultiEngineTestCase(DeferredTestCase, IFullSynchronousMulti'
95 self.assertEquals(m.dist,'b')
88 self.assertEquals(m.dist,'b')
96 self.assertEquals(m.targets,'all')
89 self.assertEquals(m.targets,'all')
97 self.assertEquals(m.block,True)
90 self.assertEquals(m.block,True)
98
91
99 def test_map_default(self):
92 def test_map_default(self):
100 self.addEngine(4)
93 self.addEngine(4)
101 m = self.multiengine.mapper()
94 m = self.multiengine.mapper()
@@ -104,7 +97,7 b' class FullSynchronousMultiEngineTestCase(DeferredTestCase, IFullSynchronousMulti'
104 d.addCallback(lambda _: self.multiengine.map(lambda x: 2*x, range(10)))
97 d.addCallback(lambda _: self.multiengine.map(lambda x: 2*x, range(10)))
105 d.addCallback(lambda r: self.assertEquals(r,[2*x for x in range(10)]))
98 d.addCallback(lambda r: self.assertEquals(r,[2*x for x in range(10)]))
106 return d
99 return d
107
100
108 def test_map_noblock(self):
101 def test_map_noblock(self):
109 self.addEngine(4)
102 self.addEngine(4)
110 m = self.multiengine.mapper(block=False)
103 m = self.multiengine.mapper(block=False)
@@ -112,14 +105,14 b' class FullSynchronousMultiEngineTestCase(DeferredTestCase, IFullSynchronousMulti'
112 d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True))
105 d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True))
113 d.addCallback(lambda r: self.assertEquals(r,[2*x for x in range(10)]))
106 d.addCallback(lambda r: self.assertEquals(r,[2*x for x in range(10)]))
114 return d
107 return d
115
108
116 def test_mapper_fail(self):
109 def test_mapper_fail(self):
117 self.addEngine(4)
110 self.addEngine(4)
118 m = self.multiengine.mapper()
111 m = self.multiengine.mapper()
119 d = m.map(lambda x: 1/0, range(10))
112 d = m.map(lambda x: 1/0, range(10))
120 d.addBoth(lambda f: self.assertRaises(ZeroDivisionError, _raise_it, f))
113 d.addBoth(lambda f: self.assertRaises(ZeroDivisionError, _raise_it, f))
121 return d
114 return d
122
115
123 def test_parallel(self):
116 def test_parallel(self):
124 self.addEngine(4)
117 self.addEngine(4)
125 p = self.multiengine.parallel()
118 p = self.multiengine.parallel()
@@ -129,7 +122,7 b' class FullSynchronousMultiEngineTestCase(DeferredTestCase, IFullSynchronousMulti'
129 d = f(range(10))
122 d = f(range(10))
130 d.addCallback(lambda r: self.assertEquals(r,[2*x for x in range(10)]))
123 d.addCallback(lambda r: self.assertEquals(r,[2*x for x in range(10)]))
131 return d
124 return d
132
125
133 def test_parallel_noblock(self):
126 def test_parallel_noblock(self):
134 self.addEngine(1)
127 self.addEngine(1)
135 p = self.multiengine.parallel(block=False)
128 p = self.multiengine.parallel(block=False)
@@ -140,7 +133,7 b' class FullSynchronousMultiEngineTestCase(DeferredTestCase, IFullSynchronousMulti'
140 d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True))
133 d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True))
141 d.addCallback(lambda r: self.assertEquals(r,[2*x for x in range(10)]))
134 d.addCallback(lambda r: self.assertEquals(r,[2*x for x in range(10)]))
142 return d
135 return d
143
136
144 def test_parallel_fail(self):
137 def test_parallel_fail(self):
145 self.addEngine(4)
138 self.addEngine(4)
146 p = self.multiengine.parallel()
139 p = self.multiengine.parallel()
@@ -150,3 +143,4 b' class FullSynchronousMultiEngineTestCase(DeferredTestCase, IFullSynchronousMulti'
150 d = f(range(10))
143 d = f(range(10))
151 d.addBoth(lambda f: self.assertRaises(ZeroDivisionError, _raise_it, f))
144 d.addBoth(lambda f: self.assertRaises(ZeroDivisionError, _raise_it, f))
152 return d
145 return d
146
@@ -31,7 +31,7 b' from IPython.kernel.multienginefc import IFCSynchronousMultiEngine'
31 from IPython.kernel.taskfc import IFCTaskController
31 from IPython.kernel.taskfc import IFCTaskController
32 from IPython.kernel.util import printer
32 from IPython.kernel.util import printer
33 from IPython.kernel.tests.tasktest import ITaskControllerTestCase
33 from IPython.kernel.tests.tasktest import ITaskControllerTestCase
34 from IPython.kernel.clientconnector import ClientConnector
34 from IPython.kernel.clientconnector import AsyncClientConnector
35 from IPython.kernel.error import CompositeError
35 from IPython.kernel.error import CompositeError
36 from IPython.kernel.parallelfunction import ParallelFunction
36 from IPython.kernel.parallelfunction import ParallelFunction
37
37
@@ -48,42 +48,34 b' def _raise_it(f):'
48
48
49 class TaskTest(DeferredTestCase, ITaskControllerTestCase):
49 class TaskTest(DeferredTestCase, ITaskControllerTestCase):
50
50
51 # XXX (fperez) this is awful: I'm fully disabling this entire test class.
52 # Right now it's blocking the tests from running at all, and I don't know
53 # how to fix it. I hope Brian can have a stab at it, but at least by doing
54 # this we can run the entire suite to completion.
55 # Once the problem is cleared, remove this skip method.
56 skip = True
57 # END XXX
58
59 def setUp(self):
51 def setUp(self):
60
52
61 self.engines = []
53 self.engines = []
62
54
63 self.controller = cs.ControllerService()
55 self.controller = cs.ControllerService()
64 self.controller.startService()
56 self.controller.startService()
65 self.imultiengine = me.IMultiEngine(self.controller)
57 self.imultiengine = me.IMultiEngine(self.controller)
66 self.itc = taskmodule.ITaskController(self.controller)
58 self.itc = taskmodule.ITaskController(self.controller)
67 self.itc.failurePenalty = 0
59 self.itc.failurePenalty = 0
68
60
69 self.mec_referenceable = IFCSynchronousMultiEngine(self.imultiengine)
61 self.mec_referenceable = IFCSynchronousMultiEngine(self.imultiengine)
70 self.tc_referenceable = IFCTaskController(self.itc)
62 self.tc_referenceable = IFCTaskController(self.itc)
71
63
72 self.controller_tub = Tub()
64 self.controller_tub = Tub()
73 self.controller_tub.listenOn('tcp:10105:interface=127.0.0.1')
65 self.controller_tub.listenOn('tcp:10111:interface=127.0.0.1')
74 self.controller_tub.setLocation('127.0.0.1:10105')
66 self.controller_tub.setLocation('127.0.0.1:10111')
75
67
76 mec_furl = self.controller_tub.registerReference(self.mec_referenceable)
68 mec_furl = self.controller_tub.registerReference(self.mec_referenceable)
77 tc_furl = self.controller_tub.registerReference(self.tc_referenceable)
69 tc_furl = self.controller_tub.registerReference(self.tc_referenceable)
78 self.controller_tub.startService()
70 self.controller_tub.startService()
79
71
80 self.client_tub = ClientConnector()
72 self.client_tub = AsyncClientConnector()
81 d = self.client_tub.get_multiengine_client(mec_furl)
73 d = self.client_tub.get_multiengine_client(furl_or_file=mec_furl)
82 d.addCallback(self.handle_mec_client)
74 d.addCallback(self.handle_mec_client)
83 d.addCallback(lambda _: self.client_tub.get_task_client(tc_furl))
75 d.addCallback(lambda _: self.client_tub.get_task_client(furl_or_file=tc_furl))
84 d.addCallback(self.handle_tc_client)
76 d.addCallback(self.handle_tc_client)
85 return d
77 return d
86
78
87 def handle_mec_client(self, client):
79 def handle_mec_client(self, client):
88 self.multiengine = client
80 self.multiengine = client
89
81
@@ -103,7 +95,7 b' class TaskTest(DeferredTestCase, ITaskControllerTestCase):'
103 d.addBoth(lambda _: self.controller.stopService())
95 d.addBoth(lambda _: self.controller.stopService())
104 dlist.append(d)
96 dlist.append(d)
105 return defer.DeferredList(dlist)
97 return defer.DeferredList(dlist)
106
98
107 def test_mapper(self):
99 def test_mapper(self):
108 self.addEngine(1)
100 self.addEngine(1)
109 m = self.tc.mapper()
101 m = self.tc.mapper()
@@ -114,7 +106,7 b' class TaskTest(DeferredTestCase, ITaskControllerTestCase):'
114 self.assertEquals(m.recovery_task,None)
106 self.assertEquals(m.recovery_task,None)
115 self.assertEquals(m.depend,None)
107 self.assertEquals(m.depend,None)
116 self.assertEquals(m.block,True)
108 self.assertEquals(m.block,True)
117
109
118 def test_map_default(self):
110 def test_map_default(self):
119 self.addEngine(1)
111 self.addEngine(1)
120 m = self.tc.mapper()
112 m = self.tc.mapper()
@@ -123,21 +115,21 b' class TaskTest(DeferredTestCase, ITaskControllerTestCase):'
123 d.addCallback(lambda _: self.tc.map(lambda x: 2*x, range(10)))
115 d.addCallback(lambda _: self.tc.map(lambda x: 2*x, range(10)))
124 d.addCallback(lambda r: self.assertEquals(r,[2*x for x in range(10)]))
116 d.addCallback(lambda r: self.assertEquals(r,[2*x for x in range(10)]))
125 return d
117 return d
126
118
127 def test_map_noblock(self):
119 def test_map_noblock(self):
128 self.addEngine(1)
120 self.addEngine(1)
129 m = self.tc.mapper(block=False)
121 m = self.tc.mapper(block=False)
130 d = m.map(lambda x: 2*x, range(10))
122 d = m.map(lambda x: 2*x, range(10))
131 d.addCallback(lambda r: self.assertEquals(r,[x for x in range(10)]))
123 d.addCallback(lambda r: self.assertEquals(r,[x for x in range(10)]))
132 return d
124 return d
133
125
134 def test_mapper_fail(self):
126 def test_mapper_fail(self):
135 self.addEngine(1)
127 self.addEngine(1)
136 m = self.tc.mapper()
128 m = self.tc.mapper()
137 d = m.map(lambda x: 1/0, range(10))
129 d = m.map(lambda x: 1/0, range(10))
138 d.addBoth(lambda f: self.assertRaises(ZeroDivisionError, _raise_it, f))
130 d.addBoth(lambda f: self.assertRaises(ZeroDivisionError, _raise_it, f))
139 return d
131 return d
140
132
141 def test_parallel(self):
133 def test_parallel(self):
142 self.addEngine(1)
134 self.addEngine(1)
143 p = self.tc.parallel()
135 p = self.tc.parallel()
@@ -147,7 +139,7 b' class TaskTest(DeferredTestCase, ITaskControllerTestCase):'
147 d = f(range(10))
139 d = f(range(10))
148 d.addCallback(lambda r: self.assertEquals(r,[2*x for x in range(10)]))
140 d.addCallback(lambda r: self.assertEquals(r,[2*x for x in range(10)]))
149 return d
141 return d
150
142
151 def test_parallel_noblock(self):
143 def test_parallel_noblock(self):
152 self.addEngine(1)
144 self.addEngine(1)
153 p = self.tc.parallel(block=False)
145 p = self.tc.parallel(block=False)
@@ -157,7 +149,7 b' class TaskTest(DeferredTestCase, ITaskControllerTestCase):'
157 d = f(range(10))
149 d = f(range(10))
158 d.addCallback(lambda r: self.assertEquals(r,[x for x in range(10)]))
150 d.addCallback(lambda r: self.assertEquals(r,[x for x in range(10)]))
159 return d
151 return d
160
152
161 def test_parallel_fail(self):
153 def test_parallel_fail(self):
162 self.addEngine(1)
154 self.addEngine(1)
163 p = self.tc.parallel()
155 p = self.tc.parallel()
@@ -167,3 +159,4 b' class TaskTest(DeferredTestCase, ITaskControllerTestCase):'
167 d = f(range(10))
159 d = f(range(10))
168 d.addBoth(lambda f: self.assertRaises(ZeroDivisionError, _raise_it, f))
160 d.addBoth(lambda f: self.assertRaises(ZeroDivisionError, _raise_it, f))
169 return d
161 return d
162
1 NO CONTENT: file was removed
NO CONTENT: file was removed
1 NO CONTENT: file was removed
NO CONTENT: file was removed
General Comments 0
You need to be logged in to leave comments. Login now