##// END OF EJS Templates
testing fixes
MinRK -
Show More
@@ -35,7 +35,7 b' class AsyncResult(object):'
35 35
36 36 msg_ids = None
37 37
38 def __init__(self, client, msg_ids, fname=''):
38 def __init__(self, client, msg_ids, fname='unknown'):
39 39 self._client = client
40 40 if isinstance(msg_ids, basestring):
41 41 msg_ids = [msg_ids]
@@ -265,7 +265,7 b' class AsyncHubResult(AsyncResult):'
265 265 else:
266 266 rdict = self._client.result_status(remote_ids, status_only=False)
267 267 pending = rdict['pending']
268 while pending and time.time() < start+timeout:
268 while pending and (timeout < 0 or time.time() < start+timeout):
269 269 rdict = self._client.result_status(remote_ids, status_only=False)
270 270 pending = rdict['pending']
271 271 if pending:
@@ -360,16 +360,17 b' class Client(HasTraits):'
360 360 if cluster_dir is not None:
361 361 try:
362 362 self._cd = ClusterDir.find_cluster_dir(cluster_dir)
363 return
363 364 except ClusterDirError:
364 365 pass
365 366 elif profile is not None:
366 367 try:
367 368 self._cd = ClusterDir.find_cluster_dir_by_profile(
368 369 ipython_dir, profile)
370 return
369 371 except ClusterDirError:
370 372 pass
371 else:
372 self._cd = None
373 self._cd = None
373 374
374 375 @property
375 376 def ids(self):
@@ -489,9 +490,9 b' class Client(HasTraits):'
489 490 """unwrap exception, and remap engineid to int."""
490 491 e = ss.unwrap_exception(content)
491 492 if e.engine_info:
492 e_uuid = e.engine_info['engineid']
493 e_uuid = e.engine_info['engine_uuid']
493 494 eid = self._engines[e_uuid]
494 e.engine_info['engineid'] = eid
495 e.engine_info['engine_id'] = eid
495 496 return e
496 497
497 498 def _register_engine(self, msg):
@@ -1338,11 +1339,11 b' class Client(HasTraits):'
1338 1339 be lists of msg_ids that are incomplete or complete. If `status_only`
1339 1340 is False, then completed results will be keyed by their `msg_id`.
1340 1341 """
1341 if not isinstance(indices_or_msg_ids, (list,tuple)):
1342 indices_or_msg_ids = [indices_or_msg_ids]
1342 if not isinstance(msg_ids, (list,tuple)):
1343 indices_or_msg_ids = [msg_ids]
1343 1344
1344 1345 theids = []
1345 for msg_id in indices_or_msg_ids:
1346 for msg_id in msg_ids:
1346 1347 if isinstance(msg_id, int):
1347 1348 msg_id = self.history[msg_id]
1348 1349 if not isinstance(msg_id, basestring):
@@ -175,7 +175,7 b' class RemoteError(KernelError):'
175 175 self.args=(ename, evalue)
176 176
177 177 def __repr__(self):
178 engineid = self.engine_info.get('engineid', ' ')
178 engineid = self.engine_info.get('engine_id', ' ')
179 179 return "<Remote[%s]:%s(%s)>"%(engineid, self.ename, self.evalue)
180 180
181 181 def __str__(self):
@@ -702,7 +702,7 b' class Hub(LoggingFactory):'
702 702 self.log.error("task::invalid task tracking message", exc_info=True)
703 703 return
704 704 content = msg['content']
705 print (content)
705 # print (content)
706 706 msg_id = content['msg_id']
707 707 engine_uuid = content['engine_id']
708 708 eid = self.by_ident[engine_uuid]
@@ -728,7 +728,7 b' class Hub(LoggingFactory):'
728 728
729 729 def save_iopub_message(self, topics, msg):
730 730 """save an iopub message into the db"""
731 print (topics)
731 # print (topics)
732 732 try:
733 733 msg = self.session.unpack_message(msg, content=True)
734 734 except:
@@ -12,6 +12,8 b''
12 12
13 13 import warnings
14 14
15 from IPython.testing import decorators as testdec
16
15 17 import map as Map
16 18 from asyncresult import AsyncMapResult
17 19
@@ -19,26 +21,32 b' from asyncresult import AsyncMapResult'
19 21 # Decorators
20 22 #-----------------------------------------------------------------------------
21 23
24 @testdec.skip_doctest
22 25 def remote(client, bound=True, block=None, targets=None, balanced=None):
23 26 """Turn a function into a remote function.
24 27
25 28 This method can be used for map:
26 29
27 >>> @remote(client,block=True)
28 def func(a)
30 In [1]: @remote(client,block=True)
31 ...: def func(a):
32 ...: pass
29 33 """
34
30 35 def remote_function(f):
31 36 return RemoteFunction(client, f, bound, block, targets, balanced)
32 37 return remote_function
33 38
39 @testdec.skip_doctest
34 40 def parallel(client, dist='b', bound=True, block=None, targets='all', balanced=None):
35 41 """Turn a function into a parallel remote function.
36 42
37 43 This method can be used for map:
38 44
39 >>> @parallel(client,block=True)
40 def func(a)
45 In [1]: @parallel(client,block=True)
46 ...: def func(a):
47 ...: pass
41 48 """
49
42 50 def parallel_function(f):
43 51 return ParallelFunction(client, f, dist, bound, block, targets, balanced)
44 52 return parallel_function
@@ -104,7 +104,7 b' class Kernel(SessionFactory):'
104 104 self._initial_exec_lines()
105 105
106 106 def _wrap_exception(self, method=None):
107 e_info = dict(engineid=self.ident, method=method)
107 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method=method)
108 108 content=wrap_exception(e_info)
109 109 return content
110 110
@@ -29,7 +29,6 b' def teardown():'
29 29 p = processes.pop()
30 30 if p.poll() is None:
31 31 try:
32 print 'terminating'
33 32 p.terminate()
34 33 except Exception, e:
35 34 print e
@@ -17,7 +17,7 b' from IPython.zmq.parallel.tests import processes,add_engine'
17 17 # simple tasks for use in apply tests
18 18
19 19 def segfault():
20 """"""
20 """this will segfault"""
21 21 import ctypes
22 22 ctypes.memset(-1,0,1)
23 23
@@ -73,9 +73,10 b' class ClusterTestCase(BaseZMQTestCase):'
73 73
74 74 def assertRaisesRemote(self, etype, f, *args, **kwargs):
75 75 try:
76 f(*args, **kwargs)
77 except error.CompositeError as e:
78 e.raise_exception()
76 try:
77 f(*args, **kwargs)
78 except error.CompositeError as e:
79 e.raise_exception()
79 80 except error.RemoteError as e:
80 81 self.assertEquals(etype.__name__, e.ename, "Should have raised %r, but raised %r"%(e.ename, etype.__name__))
81 82 else:
@@ -87,10 +88,11 b' class ClusterTestCase(BaseZMQTestCase):'
87 88 self.base_engine_count=len(self.client.ids)
88 89 self.engines=[]
89 90
90 def tearDown(self):
91 [ e.terminate() for e in filter(lambda e: e.poll() is None, self.engines) ]
92 # while len(self.client.ids) > self.base_engine_count:
93 # time.sleep(.1)
94 del self.engines
95 BaseZMQTestCase.tearDown(self)
91 # def tearDown(self):
92 # [ e.terminate() for e in filter(lambda e: e.poll() is None, self.engines) ]
93 # [ e.wait() for e in self.engines ]
94 # while len(self.client.ids) > self.base_engine_count:
95 # time.sleep(.1)
96 # del self.engines
97 # BaseZMQTestCase.tearDown(self)
96 98 No newline at end of file
@@ -2,28 +2,35 b' import time'
2 2
3 3 import nose.tools as nt
4 4
5 from IPython.zmq.parallel.asyncresult import AsyncResult
5 from IPython.zmq.parallel import client as clientmod
6 from IPython.zmq.parallel import error
7 from IPython.zmq.parallel.asyncresult import AsyncResult, AsyncHubResult
6 8 from IPython.zmq.parallel.view import LoadBalancedView, DirectView
7 9
8 from clienttest import ClusterTestCase, segfault
10 from clienttest import ClusterTestCase, segfault, wait
9 11
10 12 class TestClient(ClusterTestCase):
11 13
12 14 def test_ids(self):
13 self.assertEquals(len(self.client.ids), 1)
15 n = len(self.client.ids)
14 16 self.add_engines(3)
15 self.assertEquals(len(self.client.ids), 4)
17 self.assertEquals(len(self.client.ids), n+3)
18 self.assertTrue
16 19
17 20 def test_segfault(self):
21 """test graceful handling of engine death"""
18 22 self.add_engines(1)
19 23 eid = self.client.ids[-1]
20 self.client[eid].apply(segfault)
24 ar = self.client.apply(segfault, block=False)
25 self.assertRaisesRemote(error.EngineError, ar.get)
26 eid = ar.engine_id
21 27 while eid in self.client.ids:
22 28 time.sleep(.01)
23 29 self.client.spin()
24 30
25 31 def test_view_indexing(self):
26 self.add_engines(4)
32 """test index access for views"""
33 self.add_engines(2)
27 34 targets = self.client._build_targets('all')[-1]
28 35 v = self.client[:]
29 36 self.assertEquals(v.targets, targets)
@@ -60,17 +67,30 b' class TestClient(ClusterTestCase):'
60 67
61 68 def test_targets(self):
62 69 """test various valid targets arguments"""
63 pass
70 build = self.client._build_targets
71 ids = self.client.ids
72 idents,targets = build(None)
73 self.assertEquals(ids, targets)
64 74
65 75 def test_clear(self):
66 76 """test clear behavior"""
67 # self.add_engines(4)
68 # self.client.push()
77 self.add_engines(2)
78 self.client.block=True
79 self.client.push(dict(a=5))
80 self.client.pull('a')
81 id0 = self.client.ids[-1]
82 self.client.clear(targets=id0)
83 self.client.pull('a', targets=self.client.ids[:-1])
84 self.assertRaisesRemote(NameError, self.client.pull, 'a')
85 self.client.clear()
86 for i in self.client.ids:
87 self.assertRaisesRemote(NameError, self.client.pull, 'a', targets=i)
88
69 89
70 90 def test_push_pull(self):
71 91 """test pushing and pulling"""
72 92 data = dict(a=10, b=1.05, c=range(10), d={'e':(1,2),'f':'hi'})
73 self.add_engines(4)
93 self.add_engines(2)
74 94 push = self.client.push
75 95 pull = self.client.pull
76 96 self.client.block=True
@@ -131,4 +151,15 b' class TestClient(ClusterTestCase):'
131 151 v.execute('b=f()')
132 152 self.assertEquals(v['b'], 5)
133 153
154 def test_get_result(self):
155 """test getting results from the Hub."""
156 c = clientmod.Client(profile='iptest')
157 t = self.client.ids[-1]
158 ar = c.apply(wait, (1,), block=False, targets=t)
159 time.sleep(.25)
160 ahr = self.client.get_result(ar.msg_ids)
161 self.assertTrue(isinstance(ahr, AsyncHubResult))
162 self.assertEquals(ahr.get(), ar.get())
163 ar2 = self.client.get_result(ar.msg_ids)
164 self.assertFalse(isinstance(ar2, AsyncHubResult))
134 165 No newline at end of file
@@ -10,6 +10,7 b''
10 10 # Imports
11 11 #-----------------------------------------------------------------------------
12 12
13 from IPython.testing import decorators as testdec
13 14 from IPython.utils.traitlets import HasTraits, Bool, List, Dict, Set, Int, Instance
14 15
15 16 from IPython.external.decorator import decorator
@@ -330,7 +331,7 b' class View(HasTraits):'
330 331 block = self.block if block is None else block
331 332 return parallel(self.client, bound=bound, targets=self._targets, block=block, balanced=self._balanced)
332 333
333
334 @testdec.skip_doctest
334 335 class DirectView(View):
335 336 """Direct Multiplexer View of one or more engines.
336 337
@@ -413,7 +414,7 b' class DirectView(View):'
413 414 return self.client.push(ns, targets=self._targets, block=self.block)
414 415
415 416 push = update
416
417
417 418 def get(self, key_s):
418 419 """get object(s) by `key_s` from remote namespace
419 420 will return one object if it is a key.
@@ -430,26 +431,24 b' class DirectView(View):'
430 431 block = block if block is not None else self.block
431 432 return self.client.pull(key_s, block=block, targets=self._targets)
432 433
433 def scatter(self, key, seq, dist='b', flatten=False, targets=None, block=None):
434 def scatter(self, key, seq, dist='b', flatten=False, block=None):
434 435 """
435 436 Partition a Python sequence and send the partitions to a set of engines.
436 437 """
437 438 block = block if block is not None else self.block
438 targets = targets if targets is not None else self._targets
439 439
440 440 return self.client.scatter(key, seq, dist=dist, flatten=flatten,
441 targets=targets, block=block)
441 targets=self._targets, block=block)
442 442
443 443 @sync_results
444 444 @save_ids
445 def gather(self, key, dist='b', targets=None, block=None):
445 def gather(self, key, dist='b', block=None):
446 446 """
447 447 Gather a partitioned sequence on a set of engines as a single local seq.
448 448 """
449 449 block = block if block is not None else self.block
450 targets = targets if targets is not None else self._targets
451 450
452 return self.client.gather(key, dist=dist, targets=targets, block=block)
451 return self.client.gather(key, dist=dist, targets=self._targets, block=block)
453 452
454 453 def __getitem__(self, key):
455 454 return self.get(key)
@@ -496,7 +495,8 b' class DirectView(View):'
496 495 print "You must first load the parallelmagic extension " \
497 496 "by doing '%load_ext parallelmagic'"
498 497
499
498
499 @testdec.skip_doctest
500 500 class LoadBalancedView(View):
501 501 """An load-balancing View that only executes via the Task scheduler.
502 502
General Comments 0
You need to be logged in to leave comments. Login now