##// END OF EJS Templates
testing fixes
MinRK -
Show More
@@ -35,7 +35,7 b' class AsyncResult(object):'
35
35
36 msg_ids = None
36 msg_ids = None
37
37
38 def __init__(self, client, msg_ids, fname=''):
38 def __init__(self, client, msg_ids, fname='unknown'):
39 self._client = client
39 self._client = client
40 if isinstance(msg_ids, basestring):
40 if isinstance(msg_ids, basestring):
41 msg_ids = [msg_ids]
41 msg_ids = [msg_ids]
@@ -265,7 +265,7 b' class AsyncHubResult(AsyncResult):'
265 else:
265 else:
266 rdict = self._client.result_status(remote_ids, status_only=False)
266 rdict = self._client.result_status(remote_ids, status_only=False)
267 pending = rdict['pending']
267 pending = rdict['pending']
268 while pending and time.time() < start+timeout:
268 while pending and (timeout < 0 or time.time() < start+timeout):
269 rdict = self._client.result_status(remote_ids, status_only=False)
269 rdict = self._client.result_status(remote_ids, status_only=False)
270 pending = rdict['pending']
270 pending = rdict['pending']
271 if pending:
271 if pending:
@@ -360,15 +360,16 b' class Client(HasTraits):'
360 if cluster_dir is not None:
360 if cluster_dir is not None:
361 try:
361 try:
362 self._cd = ClusterDir.find_cluster_dir(cluster_dir)
362 self._cd = ClusterDir.find_cluster_dir(cluster_dir)
363 return
363 except ClusterDirError:
364 except ClusterDirError:
364 pass
365 pass
365 elif profile is not None:
366 elif profile is not None:
366 try:
367 try:
367 self._cd = ClusterDir.find_cluster_dir_by_profile(
368 self._cd = ClusterDir.find_cluster_dir_by_profile(
368 ipython_dir, profile)
369 ipython_dir, profile)
370 return
369 except ClusterDirError:
371 except ClusterDirError:
370 pass
372 pass
371 else:
372 self._cd = None
373 self._cd = None
373
374
374 @property
375 @property
@@ -489,9 +490,9 b' class Client(HasTraits):'
489 """unwrap exception, and remap engineid to int."""
490 """unwrap exception, and remap engineid to int."""
490 e = ss.unwrap_exception(content)
491 e = ss.unwrap_exception(content)
491 if e.engine_info:
492 if e.engine_info:
492 e_uuid = e.engine_info['engineid']
493 e_uuid = e.engine_info['engine_uuid']
493 eid = self._engines[e_uuid]
494 eid = self._engines[e_uuid]
494 e.engine_info['engineid'] = eid
495 e.engine_info['engine_id'] = eid
495 return e
496 return e
496
497
497 def _register_engine(self, msg):
498 def _register_engine(self, msg):
@@ -1338,11 +1339,11 b' class Client(HasTraits):'
1338 be lists of msg_ids that are incomplete or complete. If `status_only`
1339 be lists of msg_ids that are incomplete or complete. If `status_only`
1339 is False, then completed results will be keyed by their `msg_id`.
1340 is False, then completed results will be keyed by their `msg_id`.
1340 """
1341 """
1341 if not isinstance(indices_or_msg_ids, (list,tuple)):
1342 if not isinstance(msg_ids, (list,tuple)):
1342 indices_or_msg_ids = [indices_or_msg_ids]
1343 indices_or_msg_ids = [msg_ids]
1343
1344
1344 theids = []
1345 theids = []
1345 for msg_id in indices_or_msg_ids:
1346 for msg_id in msg_ids:
1346 if isinstance(msg_id, int):
1347 if isinstance(msg_id, int):
1347 msg_id = self.history[msg_id]
1348 msg_id = self.history[msg_id]
1348 if not isinstance(msg_id, basestring):
1349 if not isinstance(msg_id, basestring):
@@ -175,7 +175,7 b' class RemoteError(KernelError):'
175 self.args=(ename, evalue)
175 self.args=(ename, evalue)
176
176
177 def __repr__(self):
177 def __repr__(self):
178 engineid = self.engine_info.get('engineid', ' ')
178 engineid = self.engine_info.get('engine_id', ' ')
179 return "<Remote[%s]:%s(%s)>"%(engineid, self.ename, self.evalue)
179 return "<Remote[%s]:%s(%s)>"%(engineid, self.ename, self.evalue)
180
180
181 def __str__(self):
181 def __str__(self):
@@ -702,7 +702,7 b' class Hub(LoggingFactory):'
702 self.log.error("task::invalid task tracking message", exc_info=True)
702 self.log.error("task::invalid task tracking message", exc_info=True)
703 return
703 return
704 content = msg['content']
704 content = msg['content']
705 print (content)
705 # print (content)
706 msg_id = content['msg_id']
706 msg_id = content['msg_id']
707 engine_uuid = content['engine_id']
707 engine_uuid = content['engine_id']
708 eid = self.by_ident[engine_uuid]
708 eid = self.by_ident[engine_uuid]
@@ -728,7 +728,7 b' class Hub(LoggingFactory):'
728
728
729 def save_iopub_message(self, topics, msg):
729 def save_iopub_message(self, topics, msg):
730 """save an iopub message into the db"""
730 """save an iopub message into the db"""
731 print (topics)
731 # print (topics)
732 try:
732 try:
733 msg = self.session.unpack_message(msg, content=True)
733 msg = self.session.unpack_message(msg, content=True)
734 except:
734 except:
@@ -12,6 +12,8 b''
12
12
13 import warnings
13 import warnings
14
14
15 from IPython.testing import decorators as testdec
16
15 import map as Map
17 import map as Map
16 from asyncresult import AsyncMapResult
18 from asyncresult import AsyncMapResult
17
19
@@ -19,26 +21,32 b' from asyncresult import AsyncMapResult'
19 # Decorators
21 # Decorators
20 #-----------------------------------------------------------------------------
22 #-----------------------------------------------------------------------------
21
23
24 @testdec.skip_doctest
22 def remote(client, bound=True, block=None, targets=None, balanced=None):
25 def remote(client, bound=True, block=None, targets=None, balanced=None):
23 """Turn a function into a remote function.
26 """Turn a function into a remote function.
24
27
25 This method can be used for map:
28 This method can be used for map:
26
29
27 >>> @remote(client,block=True)
30 In [1]: @remote(client,block=True)
28 def func(a)
31 ...: def func(a):
32 ...: pass
29 """
33 """
34
30 def remote_function(f):
35 def remote_function(f):
31 return RemoteFunction(client, f, bound, block, targets, balanced)
36 return RemoteFunction(client, f, bound, block, targets, balanced)
32 return remote_function
37 return remote_function
33
38
39 @testdec.skip_doctest
34 def parallel(client, dist='b', bound=True, block=None, targets='all', balanced=None):
40 def parallel(client, dist='b', bound=True, block=None, targets='all', balanced=None):
35 """Turn a function into a parallel remote function.
41 """Turn a function into a parallel remote function.
36
42
37 This method can be used for map:
43 This method can be used for map:
38
44
39 >>> @parallel(client,block=True)
45 In [1]: @parallel(client,block=True)
40 def func(a)
46 ...: def func(a):
47 ...: pass
41 """
48 """
49
42 def parallel_function(f):
50 def parallel_function(f):
43 return ParallelFunction(client, f, dist, bound, block, targets, balanced)
51 return ParallelFunction(client, f, dist, bound, block, targets, balanced)
44 return parallel_function
52 return parallel_function
@@ -104,7 +104,7 b' class Kernel(SessionFactory):'
104 self._initial_exec_lines()
104 self._initial_exec_lines()
105
105
106 def _wrap_exception(self, method=None):
106 def _wrap_exception(self, method=None):
107 e_info = dict(engineid=self.ident, method=method)
107 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method=method)
108 content=wrap_exception(e_info)
108 content=wrap_exception(e_info)
109 return content
109 return content
110
110
@@ -29,7 +29,6 b' def teardown():'
29 p = processes.pop()
29 p = processes.pop()
30 if p.poll() is None:
30 if p.poll() is None:
31 try:
31 try:
32 print 'terminating'
33 p.terminate()
32 p.terminate()
34 except Exception, e:
33 except Exception, e:
35 print e
34 print e
@@ -17,7 +17,7 b' from IPython.zmq.parallel.tests import processes,add_engine'
17 # simple tasks for use in apply tests
17 # simple tasks for use in apply tests
18
18
19 def segfault():
19 def segfault():
20 """"""
20 """this will segfault"""
21 import ctypes
21 import ctypes
22 ctypes.memset(-1,0,1)
22 ctypes.memset(-1,0,1)
23
23
@@ -73,6 +73,7 b' class ClusterTestCase(BaseZMQTestCase):'
73
73
74 def assertRaisesRemote(self, etype, f, *args, **kwargs):
74 def assertRaisesRemote(self, etype, f, *args, **kwargs):
75 try:
75 try:
76 try:
76 f(*args, **kwargs)
77 f(*args, **kwargs)
77 except error.CompositeError as e:
78 except error.CompositeError as e:
78 e.raise_exception()
79 e.raise_exception()
@@ -87,10 +88,11 b' class ClusterTestCase(BaseZMQTestCase):'
87 self.base_engine_count=len(self.client.ids)
88 self.base_engine_count=len(self.client.ids)
88 self.engines=[]
89 self.engines=[]
89
90
90 def tearDown(self):
91 # def tearDown(self):
91 [ e.terminate() for e in filter(lambda e: e.poll() is None, self.engines) ]
92 # [ e.terminate() for e in filter(lambda e: e.poll() is None, self.engines) ]
93 # [ e.wait() for e in self.engines ]
92 # while len(self.client.ids) > self.base_engine_count:
94 # while len(self.client.ids) > self.base_engine_count:
93 # time.sleep(.1)
95 # time.sleep(.1)
94 del self.engines
96 # del self.engines
95 BaseZMQTestCase.tearDown(self)
97 # BaseZMQTestCase.tearDown(self)
96 No newline at end of file
98
@@ -2,28 +2,35 b' import time'
2
2
3 import nose.tools as nt
3 import nose.tools as nt
4
4
5 from IPython.zmq.parallel.asyncresult import AsyncResult
5 from IPython.zmq.parallel import client as clientmod
6 from IPython.zmq.parallel import error
7 from IPython.zmq.parallel.asyncresult import AsyncResult, AsyncHubResult
6 from IPython.zmq.parallel.view import LoadBalancedView, DirectView
8 from IPython.zmq.parallel.view import LoadBalancedView, DirectView
7
9
8 from clienttest import ClusterTestCase, segfault
10 from clienttest import ClusterTestCase, segfault, wait
9
11
10 class TestClient(ClusterTestCase):
12 class TestClient(ClusterTestCase):
11
13
12 def test_ids(self):
14 def test_ids(self):
13 self.assertEquals(len(self.client.ids), 1)
15 n = len(self.client.ids)
14 self.add_engines(3)
16 self.add_engines(3)
15 self.assertEquals(len(self.client.ids), 4)
17 self.assertEquals(len(self.client.ids), n+3)
18 self.assertTrue
16
19
17 def test_segfault(self):
20 def test_segfault(self):
21 """test graceful handling of engine death"""
18 self.add_engines(1)
22 self.add_engines(1)
19 eid = self.client.ids[-1]
23 eid = self.client.ids[-1]
20 self.client[eid].apply(segfault)
24 ar = self.client.apply(segfault, block=False)
25 self.assertRaisesRemote(error.EngineError, ar.get)
26 eid = ar.engine_id
21 while eid in self.client.ids:
27 while eid in self.client.ids:
22 time.sleep(.01)
28 time.sleep(.01)
23 self.client.spin()
29 self.client.spin()
24
30
25 def test_view_indexing(self):
31 def test_view_indexing(self):
26 self.add_engines(4)
32 """test index access for views"""
33 self.add_engines(2)
27 targets = self.client._build_targets('all')[-1]
34 targets = self.client._build_targets('all')[-1]
28 v = self.client[:]
35 v = self.client[:]
29 self.assertEquals(v.targets, targets)
36 self.assertEquals(v.targets, targets)
@@ -60,17 +67,30 b' class TestClient(ClusterTestCase):'
60
67
61 def test_targets(self):
68 def test_targets(self):
62 """test various valid targets arguments"""
69 """test various valid targets arguments"""
63 pass
70 build = self.client._build_targets
71 ids = self.client.ids
72 idents,targets = build(None)
73 self.assertEquals(ids, targets)
64
74
65 def test_clear(self):
75 def test_clear(self):
66 """test clear behavior"""
76 """test clear behavior"""
67 # self.add_engines(4)
77 self.add_engines(2)
68 # self.client.push()
78 self.client.block=True
79 self.client.push(dict(a=5))
80 self.client.pull('a')
81 id0 = self.client.ids[-1]
82 self.client.clear(targets=id0)
83 self.client.pull('a', targets=self.client.ids[:-1])
84 self.assertRaisesRemote(NameError, self.client.pull, 'a')
85 self.client.clear()
86 for i in self.client.ids:
87 self.assertRaisesRemote(NameError, self.client.pull, 'a', targets=i)
88
69
89
70 def test_push_pull(self):
90 def test_push_pull(self):
71 """test pushing and pulling"""
91 """test pushing and pulling"""
72 data = dict(a=10, b=1.05, c=range(10), d={'e':(1,2),'f':'hi'})
92 data = dict(a=10, b=1.05, c=range(10), d={'e':(1,2),'f':'hi'})
73 self.add_engines(4)
93 self.add_engines(2)
74 push = self.client.push
94 push = self.client.push
75 pull = self.client.pull
95 pull = self.client.pull
76 self.client.block=True
96 self.client.block=True
@@ -131,4 +151,15 b' class TestClient(ClusterTestCase):'
131 v.execute('b=f()')
151 v.execute('b=f()')
132 self.assertEquals(v['b'], 5)
152 self.assertEquals(v['b'], 5)
133
153
154 def test_get_result(self):
155 """test getting results from the Hub."""
156 c = clientmod.Client(profile='iptest')
157 t = self.client.ids[-1]
158 ar = c.apply(wait, (1,), block=False, targets=t)
159 time.sleep(.25)
160 ahr = self.client.get_result(ar.msg_ids)
161 self.assertTrue(isinstance(ahr, AsyncHubResult))
162 self.assertEquals(ahr.get(), ar.get())
163 ar2 = self.client.get_result(ar.msg_ids)
164 self.assertFalse(isinstance(ar2, AsyncHubResult))
134 No newline at end of file
165
@@ -10,6 +10,7 b''
10 # Imports
10 # Imports
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12
12
13 from IPython.testing import decorators as testdec
13 from IPython.utils.traitlets import HasTraits, Bool, List, Dict, Set, Int, Instance
14 from IPython.utils.traitlets import HasTraits, Bool, List, Dict, Set, Int, Instance
14
15
15 from IPython.external.decorator import decorator
16 from IPython.external.decorator import decorator
@@ -330,7 +331,7 b' class View(HasTraits):'
330 block = self.block if block is None else block
331 block = self.block if block is None else block
331 return parallel(self.client, bound=bound, targets=self._targets, block=block, balanced=self._balanced)
332 return parallel(self.client, bound=bound, targets=self._targets, block=block, balanced=self._balanced)
332
333
333
334 @testdec.skip_doctest
334 class DirectView(View):
335 class DirectView(View):
335 """Direct Multiplexer View of one or more engines.
336 """Direct Multiplexer View of one or more engines.
336
337
@@ -430,26 +431,24 b' class DirectView(View):'
430 block = block if block is not None else self.block
431 block = block if block is not None else self.block
431 return self.client.pull(key_s, block=block, targets=self._targets)
432 return self.client.pull(key_s, block=block, targets=self._targets)
432
433
433 def scatter(self, key, seq, dist='b', flatten=False, targets=None, block=None):
434 def scatter(self, key, seq, dist='b', flatten=False, block=None):
434 """
435 """
435 Partition a Python sequence and send the partitions to a set of engines.
436 Partition a Python sequence and send the partitions to a set of engines.
436 """
437 """
437 block = block if block is not None else self.block
438 block = block if block is not None else self.block
438 targets = targets if targets is not None else self._targets
439
439
440 return self.client.scatter(key, seq, dist=dist, flatten=flatten,
440 return self.client.scatter(key, seq, dist=dist, flatten=flatten,
441 targets=targets, block=block)
441 targets=self._targets, block=block)
442
442
443 @sync_results
443 @sync_results
444 @save_ids
444 @save_ids
445 def gather(self, key, dist='b', targets=None, block=None):
445 def gather(self, key, dist='b', block=None):
446 """
446 """
447 Gather a partitioned sequence on a set of engines as a single local seq.
447 Gather a partitioned sequence on a set of engines as a single local seq.
448 """
448 """
449 block = block if block is not None else self.block
449 block = block if block is not None else self.block
450 targets = targets if targets is not None else self._targets
451
450
452 return self.client.gather(key, dist=dist, targets=targets, block=block)
451 return self.client.gather(key, dist=dist, targets=self._targets, block=block)
453
452
454 def __getitem__(self, key):
453 def __getitem__(self, key):
455 return self.get(key)
454 return self.get(key)
@@ -497,6 +496,7 b' class DirectView(View):'
497 "by doing '%load_ext parallelmagic'"
496 "by doing '%load_ext parallelmagic'"
498
497
499
498
499 @testdec.skip_doctest
500 class LoadBalancedView(View):
500 class LoadBalancedView(View):
501 """An load-balancing View that only executes via the Task scheduler.
501 """An load-balancing View that only executes via the Task scheduler.
502
502
General Comments 0
You need to be logged in to leave comments. Login now