Show More
@@ -35,7 +35,7 b' class AsyncResult(object):' | |||
|
35 | 35 | |
|
36 | 36 | msg_ids = None |
|
37 | 37 | |
|
38 | def __init__(self, client, msg_ids, fname=''): | |
|
38 | def __init__(self, client, msg_ids, fname='unknown'): | |
|
39 | 39 | self._client = client |
|
40 | 40 | if isinstance(msg_ids, basestring): |
|
41 | 41 | msg_ids = [msg_ids] |
@@ -265,7 +265,7 b' class AsyncHubResult(AsyncResult):' | |||
|
265 | 265 | else: |
|
266 | 266 | rdict = self._client.result_status(remote_ids, status_only=False) |
|
267 | 267 | pending = rdict['pending'] |
|
268 | while pending and time.time() < start+timeout: | |
|
268 | while pending and (timeout < 0 or time.time() < start+timeout): | |
|
269 | 269 | rdict = self._client.result_status(remote_ids, status_only=False) |
|
270 | 270 | pending = rdict['pending'] |
|
271 | 271 | if pending: |
@@ -360,16 +360,17 b' class Client(HasTraits):' | |||
|
360 | 360 | if cluster_dir is not None: |
|
361 | 361 | try: |
|
362 | 362 | self._cd = ClusterDir.find_cluster_dir(cluster_dir) |
|
363 | return | |
|
363 | 364 | except ClusterDirError: |
|
364 | 365 | pass |
|
365 | 366 | elif profile is not None: |
|
366 | 367 | try: |
|
367 | 368 | self._cd = ClusterDir.find_cluster_dir_by_profile( |
|
368 | 369 | ipython_dir, profile) |
|
370 | return | |
|
369 | 371 | except ClusterDirError: |
|
370 | 372 | pass |
|
371 |
|
|
|
372 | self._cd = None | |
|
373 | self._cd = None | |
|
373 | 374 | |
|
374 | 375 | @property |
|
375 | 376 | def ids(self): |
@@ -489,9 +490,9 b' class Client(HasTraits):' | |||
|
489 | 490 | """unwrap exception, and remap engineid to int.""" |
|
490 | 491 | e = ss.unwrap_exception(content) |
|
491 | 492 | if e.engine_info: |
|
492 | e_uuid = e.engine_info['engineid'] | |
|
493 | e_uuid = e.engine_info['engine_uuid'] | |
|
493 | 494 | eid = self._engines[e_uuid] |
|
494 | e.engine_info['engineid'] = eid | |
|
495 | e.engine_info['engine_id'] = eid | |
|
495 | 496 | return e |
|
496 | 497 | |
|
497 | 498 | def _register_engine(self, msg): |
@@ -1338,11 +1339,11 b' class Client(HasTraits):' | |||
|
1338 | 1339 | be lists of msg_ids that are incomplete or complete. If `status_only` |
|
1339 | 1340 | is False, then completed results will be keyed by their `msg_id`. |
|
1340 | 1341 | """ |
|
1341 |
if not isinstance( |
|
|
1342 |
indices_or_msg_ids = [ |
|
|
1342 | if not isinstance(msg_ids, (list,tuple)): | |
|
1343 | indices_or_msg_ids = [msg_ids] | |
|
1343 | 1344 | |
|
1344 | 1345 | theids = [] |
|
1345 |
for msg_id in |
|
|
1346 | for msg_id in msg_ids: | |
|
1346 | 1347 | if isinstance(msg_id, int): |
|
1347 | 1348 | msg_id = self.history[msg_id] |
|
1348 | 1349 | if not isinstance(msg_id, basestring): |
@@ -175,7 +175,7 b' class RemoteError(KernelError):' | |||
|
175 | 175 | self.args=(ename, evalue) |
|
176 | 176 | |
|
177 | 177 | def __repr__(self): |
|
178 | engineid = self.engine_info.get('engineid', ' ') | |
|
178 | engineid = self.engine_info.get('engine_id', ' ') | |
|
179 | 179 | return "<Remote[%s]:%s(%s)>"%(engineid, self.ename, self.evalue) |
|
180 | 180 | |
|
181 | 181 | def __str__(self): |
@@ -702,7 +702,7 b' class Hub(LoggingFactory):' | |||
|
702 | 702 | self.log.error("task::invalid task tracking message", exc_info=True) |
|
703 | 703 | return |
|
704 | 704 | content = msg['content'] |
|
705 | print (content) | |
|
705 | # print (content) | |
|
706 | 706 | msg_id = content['msg_id'] |
|
707 | 707 | engine_uuid = content['engine_id'] |
|
708 | 708 | eid = self.by_ident[engine_uuid] |
@@ -728,7 +728,7 b' class Hub(LoggingFactory):' | |||
|
728 | 728 | |
|
729 | 729 | def save_iopub_message(self, topics, msg): |
|
730 | 730 | """save an iopub message into the db""" |
|
731 | print (topics) | |
|
731 | # print (topics) | |
|
732 | 732 | try: |
|
733 | 733 | msg = self.session.unpack_message(msg, content=True) |
|
734 | 734 | except: |
@@ -12,6 +12,8 b'' | |||
|
12 | 12 | |
|
13 | 13 | import warnings |
|
14 | 14 | |
|
15 | from IPython.testing import decorators as testdec | |
|
16 | ||
|
15 | 17 | import map as Map |
|
16 | 18 | from asyncresult import AsyncMapResult |
|
17 | 19 | |
@@ -19,26 +21,32 b' from asyncresult import AsyncMapResult' | |||
|
19 | 21 | # Decorators |
|
20 | 22 | #----------------------------------------------------------------------------- |
|
21 | 23 | |
|
24 | @testdec.skip_doctest | |
|
22 | 25 | def remote(client, bound=True, block=None, targets=None, balanced=None): |
|
23 | 26 | """Turn a function into a remote function. |
|
24 | 27 | |
|
25 | 28 | This method can be used for map: |
|
26 | 29 | |
|
27 |
|
|
|
28 | def func(a) | |
|
30 | In [1]: @remote(client,block=True) | |
|
31 | ...: def func(a): | |
|
32 | ...: pass | |
|
29 | 33 | """ |
|
34 | ||
|
30 | 35 | def remote_function(f): |
|
31 | 36 | return RemoteFunction(client, f, bound, block, targets, balanced) |
|
32 | 37 | return remote_function |
|
33 | 38 | |
|
39 | @testdec.skip_doctest | |
|
34 | 40 | def parallel(client, dist='b', bound=True, block=None, targets='all', balanced=None): |
|
35 | 41 | """Turn a function into a parallel remote function. |
|
36 | 42 | |
|
37 | 43 | This method can be used for map: |
|
38 | 44 | |
|
39 |
|
|
|
40 | def func(a) | |
|
45 | In [1]: @parallel(client,block=True) | |
|
46 | ...: def func(a): | |
|
47 | ...: pass | |
|
41 | 48 | """ |
|
49 | ||
|
42 | 50 | def parallel_function(f): |
|
43 | 51 | return ParallelFunction(client, f, dist, bound, block, targets, balanced) |
|
44 | 52 | return parallel_function |
@@ -104,7 +104,7 b' class Kernel(SessionFactory):' | |||
|
104 | 104 | self._initial_exec_lines() |
|
105 | 105 | |
|
106 | 106 | def _wrap_exception(self, method=None): |
|
107 | e_info = dict(engineid=self.ident, method=method) | |
|
107 | e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method=method) | |
|
108 | 108 | content=wrap_exception(e_info) |
|
109 | 109 | return content |
|
110 | 110 |
@@ -29,7 +29,6 b' def teardown():' | |||
|
29 | 29 | p = processes.pop() |
|
30 | 30 | if p.poll() is None: |
|
31 | 31 | try: |
|
32 | print 'terminating' | |
|
33 | 32 | p.terminate() |
|
34 | 33 | except Exception, e: |
|
35 | 34 | print e |
@@ -17,7 +17,7 b' from IPython.zmq.parallel.tests import processes,add_engine' | |||
|
17 | 17 | # simple tasks for use in apply tests |
|
18 | 18 | |
|
19 | 19 | def segfault(): |
|
20 | """""" | |
|
20 | """this will segfault""" | |
|
21 | 21 | import ctypes |
|
22 | 22 | ctypes.memset(-1,0,1) |
|
23 | 23 | |
@@ -73,9 +73,10 b' class ClusterTestCase(BaseZMQTestCase):' | |||
|
73 | 73 | |
|
74 | 74 | def assertRaisesRemote(self, etype, f, *args, **kwargs): |
|
75 | 75 | try: |
|
76 | f(*args, **kwargs) | |
|
77 | except error.CompositeError as e: | |
|
78 | e.raise_exception() | |
|
76 | try: | |
|
77 | f(*args, **kwargs) | |
|
78 | except error.CompositeError as e: | |
|
79 | e.raise_exception() | |
|
79 | 80 | except error.RemoteError as e: |
|
80 | 81 | self.assertEquals(etype.__name__, e.ename, "Should have raised %r, but raised %r"%(e.ename, etype.__name__)) |
|
81 | 82 | else: |
@@ -87,10 +88,11 b' class ClusterTestCase(BaseZMQTestCase):' | |||
|
87 | 88 | self.base_engine_count=len(self.client.ids) |
|
88 | 89 | self.engines=[] |
|
89 | 90 | |
|
90 | def tearDown(self): | |
|
91 | [ e.terminate() for e in filter(lambda e: e.poll() is None, self.engines) ] | |
|
92 | # while len(self.client.ids) > self.base_engine_count: | |
|
93 | # time.sleep(.1) | |
|
94 | del self.engines | |
|
95 | BaseZMQTestCase.tearDown(self) | |
|
91 | # def tearDown(self): | |
|
92 | # [ e.terminate() for e in filter(lambda e: e.poll() is None, self.engines) ] | |
|
93 | # [ e.wait() for e in self.engines ] | |
|
94 | # while len(self.client.ids) > self.base_engine_count: | |
|
95 | # time.sleep(.1) | |
|
96 | # del self.engines | |
|
97 | # BaseZMQTestCase.tearDown(self) | |
|
96 | 98 | No newline at end of file |
@@ -2,28 +2,35 b' import time' | |||
|
2 | 2 | |
|
3 | 3 | import nose.tools as nt |
|
4 | 4 | |
|
5 |
from IPython.zmq.parallel |
|
|
5 | from IPython.zmq.parallel import client as clientmod | |
|
6 | from IPython.zmq.parallel import error | |
|
7 | from IPython.zmq.parallel.asyncresult import AsyncResult, AsyncHubResult | |
|
6 | 8 | from IPython.zmq.parallel.view import LoadBalancedView, DirectView |
|
7 | 9 | |
|
8 | from clienttest import ClusterTestCase, segfault | |
|
10 | from clienttest import ClusterTestCase, segfault, wait | |
|
9 | 11 | |
|
10 | 12 | class TestClient(ClusterTestCase): |
|
11 | 13 | |
|
12 | 14 | def test_ids(self): |
|
13 |
|
|
|
15 | n = len(self.client.ids) | |
|
14 | 16 | self.add_engines(3) |
|
15 |
self.assertEquals(len(self.client.ids), |
|
|
17 | self.assertEquals(len(self.client.ids), n+3) | |
|
18 | self.assertTrue | |
|
16 | 19 | |
|
17 | 20 | def test_segfault(self): |
|
21 | """test graceful handling of engine death""" | |
|
18 | 22 | self.add_engines(1) |
|
19 | 23 | eid = self.client.ids[-1] |
|
20 |
self.client |
|
|
24 | ar = self.client.apply(segfault, block=False) | |
|
25 | self.assertRaisesRemote(error.EngineError, ar.get) | |
|
26 | eid = ar.engine_id | |
|
21 | 27 | while eid in self.client.ids: |
|
22 | 28 | time.sleep(.01) |
|
23 | 29 | self.client.spin() |
|
24 | 30 | |
|
25 | 31 | def test_view_indexing(self): |
|
26 | self.add_engines(4) | |
|
32 | """test index access for views""" | |
|
33 | self.add_engines(2) | |
|
27 | 34 | targets = self.client._build_targets('all')[-1] |
|
28 | 35 | v = self.client[:] |
|
29 | 36 | self.assertEquals(v.targets, targets) |
@@ -60,17 +67,30 b' class TestClient(ClusterTestCase):' | |||
|
60 | 67 | |
|
61 | 68 | def test_targets(self): |
|
62 | 69 | """test various valid targets arguments""" |
|
63 | pass | |
|
70 | build = self.client._build_targets | |
|
71 | ids = self.client.ids | |
|
72 | idents,targets = build(None) | |
|
73 | self.assertEquals(ids, targets) | |
|
64 | 74 | |
|
65 | 75 | def test_clear(self): |
|
66 | 76 | """test clear behavior""" |
|
67 |
|
|
|
68 |
|
|
|
77 | self.add_engines(2) | |
|
78 | self.client.block=True | |
|
79 | self.client.push(dict(a=5)) | |
|
80 | self.client.pull('a') | |
|
81 | id0 = self.client.ids[-1] | |
|
82 | self.client.clear(targets=id0) | |
|
83 | self.client.pull('a', targets=self.client.ids[:-1]) | |
|
84 | self.assertRaisesRemote(NameError, self.client.pull, 'a') | |
|
85 | self.client.clear() | |
|
86 | for i in self.client.ids: | |
|
87 | self.assertRaisesRemote(NameError, self.client.pull, 'a', targets=i) | |
|
88 | ||
|
69 | 89 | |
|
70 | 90 | def test_push_pull(self): |
|
71 | 91 | """test pushing and pulling""" |
|
72 | 92 | data = dict(a=10, b=1.05, c=range(10), d={'e':(1,2),'f':'hi'}) |
|
73 |
self.add_engines( |
|
|
93 | self.add_engines(2) | |
|
74 | 94 | push = self.client.push |
|
75 | 95 | pull = self.client.pull |
|
76 | 96 | self.client.block=True |
@@ -131,4 +151,15 b' class TestClient(ClusterTestCase):' | |||
|
131 | 151 | v.execute('b=f()') |
|
132 | 152 | self.assertEquals(v['b'], 5) |
|
133 | 153 | |
|
154 | def test_get_result(self): | |
|
155 | """test getting results from the Hub.""" | |
|
156 | c = clientmod.Client(profile='iptest') | |
|
157 | t = self.client.ids[-1] | |
|
158 | ar = c.apply(wait, (1,), block=False, targets=t) | |
|
159 | time.sleep(.25) | |
|
160 | ahr = self.client.get_result(ar.msg_ids) | |
|
161 | self.assertTrue(isinstance(ahr, AsyncHubResult)) | |
|
162 | self.assertEquals(ahr.get(), ar.get()) | |
|
163 | ar2 = self.client.get_result(ar.msg_ids) | |
|
164 | self.assertFalse(isinstance(ar2, AsyncHubResult)) | |
|
134 | 165 | No newline at end of file |
@@ -10,6 +10,7 b'' | |||
|
10 | 10 | # Imports |
|
11 | 11 | #----------------------------------------------------------------------------- |
|
12 | 12 | |
|
13 | from IPython.testing import decorators as testdec | |
|
13 | 14 | from IPython.utils.traitlets import HasTraits, Bool, List, Dict, Set, Int, Instance |
|
14 | 15 | |
|
15 | 16 | from IPython.external.decorator import decorator |
@@ -330,7 +331,7 b' class View(HasTraits):' | |||
|
330 | 331 | block = self.block if block is None else block |
|
331 | 332 | return parallel(self.client, bound=bound, targets=self._targets, block=block, balanced=self._balanced) |
|
332 | 333 | |
|
333 | ||
|
334 | @testdec.skip_doctest | |
|
334 | 335 | class DirectView(View): |
|
335 | 336 | """Direct Multiplexer View of one or more engines. |
|
336 | 337 | |
@@ -413,7 +414,7 b' class DirectView(View):' | |||
|
413 | 414 | return self.client.push(ns, targets=self._targets, block=self.block) |
|
414 | 415 | |
|
415 | 416 | push = update |
|
416 | ||
|
417 | ||
|
417 | 418 | def get(self, key_s): |
|
418 | 419 | """get object(s) by `key_s` from remote namespace |
|
419 | 420 | will return one object if it is a key. |
@@ -430,26 +431,24 b' class DirectView(View):' | |||
|
430 | 431 | block = block if block is not None else self.block |
|
431 | 432 | return self.client.pull(key_s, block=block, targets=self._targets) |
|
432 | 433 | |
|
433 |
def scatter(self, key, seq, dist='b', flatten=False, |
|
|
434 | def scatter(self, key, seq, dist='b', flatten=False, block=None): | |
|
434 | 435 | """ |
|
435 | 436 | Partition a Python sequence and send the partitions to a set of engines. |
|
436 | 437 | """ |
|
437 | 438 | block = block if block is not None else self.block |
|
438 | targets = targets if targets is not None else self._targets | |
|
439 | 439 | |
|
440 | 440 | return self.client.scatter(key, seq, dist=dist, flatten=flatten, |
|
441 | targets=targets, block=block) | |
|
441 | targets=self._targets, block=block) | |
|
442 | 442 | |
|
443 | 443 | @sync_results |
|
444 | 444 | @save_ids |
|
445 |
def gather(self, key, dist='b', |
|
|
445 | def gather(self, key, dist='b', block=None): | |
|
446 | 446 | """ |
|
447 | 447 | Gather a partitioned sequence on a set of engines as a single local seq. |
|
448 | 448 | """ |
|
449 | 449 | block = block if block is not None else self.block |
|
450 | targets = targets if targets is not None else self._targets | |
|
451 | 450 | |
|
452 | return self.client.gather(key, dist=dist, targets=targets, block=block) | |
|
451 | return self.client.gather(key, dist=dist, targets=self._targets, block=block) | |
|
453 | 452 | |
|
454 | 453 | def __getitem__(self, key): |
|
455 | 454 | return self.get(key) |
@@ -496,7 +495,8 b' class DirectView(View):' | |||
|
496 | 495 | print "You must first load the parallelmagic extension " \ |
|
497 | 496 | "by doing '%load_ext parallelmagic'" |
|
498 | 497 | |
|
499 | ||
|
498 | ||
|
499 | @testdec.skip_doctest | |
|
500 | 500 | class LoadBalancedView(View): |
|
501 | 501 | """An load-balancing View that only executes via the Task scheduler. |
|
502 | 502 |
General Comments 0
You need to be logged in to leave comments.
Login now