Show More
@@ -35,7 +35,7 b' class AsyncResult(object):' | |||||
35 |
|
35 | |||
36 | msg_ids = None |
|
36 | msg_ids = None | |
37 |
|
37 | |||
38 | def __init__(self, client, msg_ids, fname=''): |
|
38 | def __init__(self, client, msg_ids, fname='unknown'): | |
39 | self._client = client |
|
39 | self._client = client | |
40 | if isinstance(msg_ids, basestring): |
|
40 | if isinstance(msg_ids, basestring): | |
41 | msg_ids = [msg_ids] |
|
41 | msg_ids = [msg_ids] | |
@@ -265,7 +265,7 b' class AsyncHubResult(AsyncResult):' | |||||
265 | else: |
|
265 | else: | |
266 | rdict = self._client.result_status(remote_ids, status_only=False) |
|
266 | rdict = self._client.result_status(remote_ids, status_only=False) | |
267 | pending = rdict['pending'] |
|
267 | pending = rdict['pending'] | |
268 | while pending and time.time() < start+timeout: |
|
268 | while pending and (timeout < 0 or time.time() < start+timeout): | |
269 | rdict = self._client.result_status(remote_ids, status_only=False) |
|
269 | rdict = self._client.result_status(remote_ids, status_only=False) | |
270 | pending = rdict['pending'] |
|
270 | pending = rdict['pending'] | |
271 | if pending: |
|
271 | if pending: |
@@ -360,16 +360,17 b' class Client(HasTraits):' | |||||
360 | if cluster_dir is not None: |
|
360 | if cluster_dir is not None: | |
361 | try: |
|
361 | try: | |
362 | self._cd = ClusterDir.find_cluster_dir(cluster_dir) |
|
362 | self._cd = ClusterDir.find_cluster_dir(cluster_dir) | |
|
363 | return | |||
363 | except ClusterDirError: |
|
364 | except ClusterDirError: | |
364 | pass |
|
365 | pass | |
365 | elif profile is not None: |
|
366 | elif profile is not None: | |
366 | try: |
|
367 | try: | |
367 | self._cd = ClusterDir.find_cluster_dir_by_profile( |
|
368 | self._cd = ClusterDir.find_cluster_dir_by_profile( | |
368 | ipython_dir, profile) |
|
369 | ipython_dir, profile) | |
|
370 | return | |||
369 | except ClusterDirError: |
|
371 | except ClusterDirError: | |
370 | pass |
|
372 | pass | |
371 |
|
|
373 | self._cd = None | |
372 | self._cd = None |
|
|||
373 |
|
374 | |||
374 | @property |
|
375 | @property | |
375 | def ids(self): |
|
376 | def ids(self): | |
@@ -489,9 +490,9 b' class Client(HasTraits):' | |||||
489 | """unwrap exception, and remap engineid to int.""" |
|
490 | """unwrap exception, and remap engineid to int.""" | |
490 | e = ss.unwrap_exception(content) |
|
491 | e = ss.unwrap_exception(content) | |
491 | if e.engine_info: |
|
492 | if e.engine_info: | |
492 | e_uuid = e.engine_info['engineid'] |
|
493 | e_uuid = e.engine_info['engine_uuid'] | |
493 | eid = self._engines[e_uuid] |
|
494 | eid = self._engines[e_uuid] | |
494 | e.engine_info['engineid'] = eid |
|
495 | e.engine_info['engine_id'] = eid | |
495 | return e |
|
496 | return e | |
496 |
|
497 | |||
497 | def _register_engine(self, msg): |
|
498 | def _register_engine(self, msg): | |
@@ -1338,11 +1339,11 b' class Client(HasTraits):' | |||||
1338 | be lists of msg_ids that are incomplete or complete. If `status_only` |
|
1339 | be lists of msg_ids that are incomplete or complete. If `status_only` | |
1339 | is False, then completed results will be keyed by their `msg_id`. |
|
1340 | is False, then completed results will be keyed by their `msg_id`. | |
1340 | """ |
|
1341 | """ | |
1341 |
if not isinstance( |
|
1342 | if not isinstance(msg_ids, (list,tuple)): | |
1342 |
indices_or_msg_ids = [ |
|
1343 | indices_or_msg_ids = [msg_ids] | |
1343 |
|
1344 | |||
1344 | theids = [] |
|
1345 | theids = [] | |
1345 |
for msg_id in |
|
1346 | for msg_id in msg_ids: | |
1346 | if isinstance(msg_id, int): |
|
1347 | if isinstance(msg_id, int): | |
1347 | msg_id = self.history[msg_id] |
|
1348 | msg_id = self.history[msg_id] | |
1348 | if not isinstance(msg_id, basestring): |
|
1349 | if not isinstance(msg_id, basestring): |
@@ -175,7 +175,7 b' class RemoteError(KernelError):' | |||||
175 | self.args=(ename, evalue) |
|
175 | self.args=(ename, evalue) | |
176 |
|
176 | |||
177 | def __repr__(self): |
|
177 | def __repr__(self): | |
178 | engineid = self.engine_info.get('engineid', ' ') |
|
178 | engineid = self.engine_info.get('engine_id', ' ') | |
179 | return "<Remote[%s]:%s(%s)>"%(engineid, self.ename, self.evalue) |
|
179 | return "<Remote[%s]:%s(%s)>"%(engineid, self.ename, self.evalue) | |
180 |
|
180 | |||
181 | def __str__(self): |
|
181 | def __str__(self): |
@@ -702,7 +702,7 b' class Hub(LoggingFactory):' | |||||
702 | self.log.error("task::invalid task tracking message", exc_info=True) |
|
702 | self.log.error("task::invalid task tracking message", exc_info=True) | |
703 | return |
|
703 | return | |
704 | content = msg['content'] |
|
704 | content = msg['content'] | |
705 | print (content) |
|
705 | # print (content) | |
706 | msg_id = content['msg_id'] |
|
706 | msg_id = content['msg_id'] | |
707 | engine_uuid = content['engine_id'] |
|
707 | engine_uuid = content['engine_id'] | |
708 | eid = self.by_ident[engine_uuid] |
|
708 | eid = self.by_ident[engine_uuid] | |
@@ -728,7 +728,7 b' class Hub(LoggingFactory):' | |||||
728 |
|
728 | |||
729 | def save_iopub_message(self, topics, msg): |
|
729 | def save_iopub_message(self, topics, msg): | |
730 | """save an iopub message into the db""" |
|
730 | """save an iopub message into the db""" | |
731 | print (topics) |
|
731 | # print (topics) | |
732 | try: |
|
732 | try: | |
733 | msg = self.session.unpack_message(msg, content=True) |
|
733 | msg = self.session.unpack_message(msg, content=True) | |
734 | except: |
|
734 | except: |
@@ -12,6 +12,8 b'' | |||||
12 |
|
12 | |||
13 | import warnings |
|
13 | import warnings | |
14 |
|
14 | |||
|
15 | from IPython.testing import decorators as testdec | |||
|
16 | ||||
15 | import map as Map |
|
17 | import map as Map | |
16 | from asyncresult import AsyncMapResult |
|
18 | from asyncresult import AsyncMapResult | |
17 |
|
19 | |||
@@ -19,26 +21,32 b' from asyncresult import AsyncMapResult' | |||||
19 | # Decorators |
|
21 | # Decorators | |
20 | #----------------------------------------------------------------------------- |
|
22 | #----------------------------------------------------------------------------- | |
21 |
|
23 | |||
|
24 | @testdec.skip_doctest | |||
22 | def remote(client, bound=True, block=None, targets=None, balanced=None): |
|
25 | def remote(client, bound=True, block=None, targets=None, balanced=None): | |
23 | """Turn a function into a remote function. |
|
26 | """Turn a function into a remote function. | |
24 |
|
27 | |||
25 | This method can be used for map: |
|
28 | This method can be used for map: | |
26 |
|
29 | |||
27 |
|
|
30 | In [1]: @remote(client,block=True) | |
28 | def func(a) |
|
31 | ...: def func(a): | |
|
32 | ...: pass | |||
29 | """ |
|
33 | """ | |
|
34 | ||||
30 | def remote_function(f): |
|
35 | def remote_function(f): | |
31 | return RemoteFunction(client, f, bound, block, targets, balanced) |
|
36 | return RemoteFunction(client, f, bound, block, targets, balanced) | |
32 | return remote_function |
|
37 | return remote_function | |
33 |
|
38 | |||
|
39 | @testdec.skip_doctest | |||
34 | def parallel(client, dist='b', bound=True, block=None, targets='all', balanced=None): |
|
40 | def parallel(client, dist='b', bound=True, block=None, targets='all', balanced=None): | |
35 | """Turn a function into a parallel remote function. |
|
41 | """Turn a function into a parallel remote function. | |
36 |
|
42 | |||
37 | This method can be used for map: |
|
43 | This method can be used for map: | |
38 |
|
44 | |||
39 |
|
|
45 | In [1]: @parallel(client,block=True) | |
40 | def func(a) |
|
46 | ...: def func(a): | |
|
47 | ...: pass | |||
41 | """ |
|
48 | """ | |
|
49 | ||||
42 | def parallel_function(f): |
|
50 | def parallel_function(f): | |
43 | return ParallelFunction(client, f, dist, bound, block, targets, balanced) |
|
51 | return ParallelFunction(client, f, dist, bound, block, targets, balanced) | |
44 | return parallel_function |
|
52 | return parallel_function |
@@ -104,7 +104,7 b' class Kernel(SessionFactory):' | |||||
104 | self._initial_exec_lines() |
|
104 | self._initial_exec_lines() | |
105 |
|
105 | |||
106 | def _wrap_exception(self, method=None): |
|
106 | def _wrap_exception(self, method=None): | |
107 | e_info = dict(engineid=self.ident, method=method) |
|
107 | e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method=method) | |
108 | content=wrap_exception(e_info) |
|
108 | content=wrap_exception(e_info) | |
109 | return content |
|
109 | return content | |
110 |
|
110 |
@@ -29,7 +29,6 b' def teardown():' | |||||
29 | p = processes.pop() |
|
29 | p = processes.pop() | |
30 | if p.poll() is None: |
|
30 | if p.poll() is None: | |
31 | try: |
|
31 | try: | |
32 | print 'terminating' |
|
|||
33 | p.terminate() |
|
32 | p.terminate() | |
34 | except Exception, e: |
|
33 | except Exception, e: | |
35 | print e |
|
34 | print e |
@@ -17,7 +17,7 b' from IPython.zmq.parallel.tests import processes,add_engine' | |||||
17 | # simple tasks for use in apply tests |
|
17 | # simple tasks for use in apply tests | |
18 |
|
18 | |||
19 | def segfault(): |
|
19 | def segfault(): | |
20 | """""" |
|
20 | """this will segfault""" | |
21 | import ctypes |
|
21 | import ctypes | |
22 | ctypes.memset(-1,0,1) |
|
22 | ctypes.memset(-1,0,1) | |
23 |
|
23 | |||
@@ -73,9 +73,10 b' class ClusterTestCase(BaseZMQTestCase):' | |||||
73 |
|
73 | |||
74 | def assertRaisesRemote(self, etype, f, *args, **kwargs): |
|
74 | def assertRaisesRemote(self, etype, f, *args, **kwargs): | |
75 | try: |
|
75 | try: | |
76 | f(*args, **kwargs) |
|
76 | try: | |
77 | except error.CompositeError as e: |
|
77 | f(*args, **kwargs) | |
78 | e.raise_exception() |
|
78 | except error.CompositeError as e: | |
|
79 | e.raise_exception() | |||
79 | except error.RemoteError as e: |
|
80 | except error.RemoteError as e: | |
80 | self.assertEquals(etype.__name__, e.ename, "Should have raised %r, but raised %r"%(e.ename, etype.__name__)) |
|
81 | self.assertEquals(etype.__name__, e.ename, "Should have raised %r, but raised %r"%(e.ename, etype.__name__)) | |
81 | else: |
|
82 | else: | |
@@ -87,10 +88,11 b' class ClusterTestCase(BaseZMQTestCase):' | |||||
87 | self.base_engine_count=len(self.client.ids) |
|
88 | self.base_engine_count=len(self.client.ids) | |
88 | self.engines=[] |
|
89 | self.engines=[] | |
89 |
|
90 | |||
90 | def tearDown(self): |
|
91 | # def tearDown(self): | |
91 | [ e.terminate() for e in filter(lambda e: e.poll() is None, self.engines) ] |
|
92 | # [ e.terminate() for e in filter(lambda e: e.poll() is None, self.engines) ] | |
92 | # while len(self.client.ids) > self.base_engine_count: |
|
93 | # [ e.wait() for e in self.engines ] | |
93 | # time.sleep(.1) |
|
94 | # while len(self.client.ids) > self.base_engine_count: | |
94 | del self.engines |
|
95 | # time.sleep(.1) | |
95 | BaseZMQTestCase.tearDown(self) |
|
96 | # del self.engines | |
|
97 | # BaseZMQTestCase.tearDown(self) | |||
96 | No newline at end of file |
|
98 |
@@ -2,28 +2,35 b' import time' | |||||
2 |
|
2 | |||
3 | import nose.tools as nt |
|
3 | import nose.tools as nt | |
4 |
|
4 | |||
5 |
from IPython.zmq.parallel |
|
5 | from IPython.zmq.parallel import client as clientmod | |
|
6 | from IPython.zmq.parallel import error | |||
|
7 | from IPython.zmq.parallel.asyncresult import AsyncResult, AsyncHubResult | |||
6 | from IPython.zmq.parallel.view import LoadBalancedView, DirectView |
|
8 | from IPython.zmq.parallel.view import LoadBalancedView, DirectView | |
7 |
|
9 | |||
8 | from clienttest import ClusterTestCase, segfault |
|
10 | from clienttest import ClusterTestCase, segfault, wait | |
9 |
|
11 | |||
10 | class TestClient(ClusterTestCase): |
|
12 | class TestClient(ClusterTestCase): | |
11 |
|
13 | |||
12 | def test_ids(self): |
|
14 | def test_ids(self): | |
13 |
|
|
15 | n = len(self.client.ids) | |
14 | self.add_engines(3) |
|
16 | self.add_engines(3) | |
15 |
self.assertEquals(len(self.client.ids), |
|
17 | self.assertEquals(len(self.client.ids), n+3) | |
|
18 | self.assertTrue | |||
16 |
|
19 | |||
17 | def test_segfault(self): |
|
20 | def test_segfault(self): | |
|
21 | """test graceful handling of engine death""" | |||
18 | self.add_engines(1) |
|
22 | self.add_engines(1) | |
19 | eid = self.client.ids[-1] |
|
23 | eid = self.client.ids[-1] | |
20 |
self.client |
|
24 | ar = self.client.apply(segfault, block=False) | |
|
25 | self.assertRaisesRemote(error.EngineError, ar.get) | |||
|
26 | eid = ar.engine_id | |||
21 | while eid in self.client.ids: |
|
27 | while eid in self.client.ids: | |
22 | time.sleep(.01) |
|
28 | time.sleep(.01) | |
23 | self.client.spin() |
|
29 | self.client.spin() | |
24 |
|
30 | |||
25 | def test_view_indexing(self): |
|
31 | def test_view_indexing(self): | |
26 | self.add_engines(4) |
|
32 | """test index access for views""" | |
|
33 | self.add_engines(2) | |||
27 | targets = self.client._build_targets('all')[-1] |
|
34 | targets = self.client._build_targets('all')[-1] | |
28 | v = self.client[:] |
|
35 | v = self.client[:] | |
29 | self.assertEquals(v.targets, targets) |
|
36 | self.assertEquals(v.targets, targets) | |
@@ -60,17 +67,30 b' class TestClient(ClusterTestCase):' | |||||
60 |
|
67 | |||
61 | def test_targets(self): |
|
68 | def test_targets(self): | |
62 | """test various valid targets arguments""" |
|
69 | """test various valid targets arguments""" | |
63 | pass |
|
70 | build = self.client._build_targets | |
|
71 | ids = self.client.ids | |||
|
72 | idents,targets = build(None) | |||
|
73 | self.assertEquals(ids, targets) | |||
64 |
|
74 | |||
65 | def test_clear(self): |
|
75 | def test_clear(self): | |
66 | """test clear behavior""" |
|
76 | """test clear behavior""" | |
67 |
|
|
77 | self.add_engines(2) | |
68 |
|
|
78 | self.client.block=True | |
|
79 | self.client.push(dict(a=5)) | |||
|
80 | self.client.pull('a') | |||
|
81 | id0 = self.client.ids[-1] | |||
|
82 | self.client.clear(targets=id0) | |||
|
83 | self.client.pull('a', targets=self.client.ids[:-1]) | |||
|
84 | self.assertRaisesRemote(NameError, self.client.pull, 'a') | |||
|
85 | self.client.clear() | |||
|
86 | for i in self.client.ids: | |||
|
87 | self.assertRaisesRemote(NameError, self.client.pull, 'a', targets=i) | |||
|
88 | ||||
69 |
|
89 | |||
70 | def test_push_pull(self): |
|
90 | def test_push_pull(self): | |
71 | """test pushing and pulling""" |
|
91 | """test pushing and pulling""" | |
72 | data = dict(a=10, b=1.05, c=range(10), d={'e':(1,2),'f':'hi'}) |
|
92 | data = dict(a=10, b=1.05, c=range(10), d={'e':(1,2),'f':'hi'}) | |
73 |
self.add_engines( |
|
93 | self.add_engines(2) | |
74 | push = self.client.push |
|
94 | push = self.client.push | |
75 | pull = self.client.pull |
|
95 | pull = self.client.pull | |
76 | self.client.block=True |
|
96 | self.client.block=True | |
@@ -131,4 +151,15 b' class TestClient(ClusterTestCase):' | |||||
131 | v.execute('b=f()') |
|
151 | v.execute('b=f()') | |
132 | self.assertEquals(v['b'], 5) |
|
152 | self.assertEquals(v['b'], 5) | |
133 |
|
153 | |||
|
154 | def test_get_result(self): | |||
|
155 | """test getting results from the Hub.""" | |||
|
156 | c = clientmod.Client(profile='iptest') | |||
|
157 | t = self.client.ids[-1] | |||
|
158 | ar = c.apply(wait, (1,), block=False, targets=t) | |||
|
159 | time.sleep(.25) | |||
|
160 | ahr = self.client.get_result(ar.msg_ids) | |||
|
161 | self.assertTrue(isinstance(ahr, AsyncHubResult)) | |||
|
162 | self.assertEquals(ahr.get(), ar.get()) | |||
|
163 | ar2 = self.client.get_result(ar.msg_ids) | |||
|
164 | self.assertFalse(isinstance(ar2, AsyncHubResult)) | |||
134 | No newline at end of file |
|
165 |
@@ -10,6 +10,7 b'' | |||||
10 | # Imports |
|
10 | # Imports | |
11 | #----------------------------------------------------------------------------- |
|
11 | #----------------------------------------------------------------------------- | |
12 |
|
12 | |||
|
13 | from IPython.testing import decorators as testdec | |||
13 | from IPython.utils.traitlets import HasTraits, Bool, List, Dict, Set, Int, Instance |
|
14 | from IPython.utils.traitlets import HasTraits, Bool, List, Dict, Set, Int, Instance | |
14 |
|
15 | |||
15 | from IPython.external.decorator import decorator |
|
16 | from IPython.external.decorator import decorator | |
@@ -330,7 +331,7 b' class View(HasTraits):' | |||||
330 | block = self.block if block is None else block |
|
331 | block = self.block if block is None else block | |
331 | return parallel(self.client, bound=bound, targets=self._targets, block=block, balanced=self._balanced) |
|
332 | return parallel(self.client, bound=bound, targets=self._targets, block=block, balanced=self._balanced) | |
332 |
|
333 | |||
333 |
|
334 | @testdec.skip_doctest | ||
334 | class DirectView(View): |
|
335 | class DirectView(View): | |
335 | """Direct Multiplexer View of one or more engines. |
|
336 | """Direct Multiplexer View of one or more engines. | |
336 |
|
337 | |||
@@ -413,7 +414,7 b' class DirectView(View):' | |||||
413 | return self.client.push(ns, targets=self._targets, block=self.block) |
|
414 | return self.client.push(ns, targets=self._targets, block=self.block) | |
414 |
|
415 | |||
415 | push = update |
|
416 | push = update | |
416 |
|
417 | |||
417 | def get(self, key_s): |
|
418 | def get(self, key_s): | |
418 | """get object(s) by `key_s` from remote namespace |
|
419 | """get object(s) by `key_s` from remote namespace | |
419 | will return one object if it is a key. |
|
420 | will return one object if it is a key. | |
@@ -430,26 +431,24 b' class DirectView(View):' | |||||
430 | block = block if block is not None else self.block |
|
431 | block = block if block is not None else self.block | |
431 | return self.client.pull(key_s, block=block, targets=self._targets) |
|
432 | return self.client.pull(key_s, block=block, targets=self._targets) | |
432 |
|
433 | |||
433 |
def scatter(self, key, seq, dist='b', flatten=False, |
|
434 | def scatter(self, key, seq, dist='b', flatten=False, block=None): | |
434 | """ |
|
435 | """ | |
435 | Partition a Python sequence and send the partitions to a set of engines. |
|
436 | Partition a Python sequence and send the partitions to a set of engines. | |
436 | """ |
|
437 | """ | |
437 | block = block if block is not None else self.block |
|
438 | block = block if block is not None else self.block | |
438 | targets = targets if targets is not None else self._targets |
|
|||
439 |
|
439 | |||
440 | return self.client.scatter(key, seq, dist=dist, flatten=flatten, |
|
440 | return self.client.scatter(key, seq, dist=dist, flatten=flatten, | |
441 | targets=targets, block=block) |
|
441 | targets=self._targets, block=block) | |
442 |
|
442 | |||
443 | @sync_results |
|
443 | @sync_results | |
444 | @save_ids |
|
444 | @save_ids | |
445 |
def gather(self, key, dist='b', |
|
445 | def gather(self, key, dist='b', block=None): | |
446 | """ |
|
446 | """ | |
447 | Gather a partitioned sequence on a set of engines as a single local seq. |
|
447 | Gather a partitioned sequence on a set of engines as a single local seq. | |
448 | """ |
|
448 | """ | |
449 | block = block if block is not None else self.block |
|
449 | block = block if block is not None else self.block | |
450 | targets = targets if targets is not None else self._targets |
|
|||
451 |
|
450 | |||
452 | return self.client.gather(key, dist=dist, targets=targets, block=block) |
|
451 | return self.client.gather(key, dist=dist, targets=self._targets, block=block) | |
453 |
|
452 | |||
454 | def __getitem__(self, key): |
|
453 | def __getitem__(self, key): | |
455 | return self.get(key) |
|
454 | return self.get(key) | |
@@ -496,7 +495,8 b' class DirectView(View):' | |||||
496 | print "You must first load the parallelmagic extension " \ |
|
495 | print "You must first load the parallelmagic extension " \ | |
497 | "by doing '%load_ext parallelmagic'" |
|
496 | "by doing '%load_ext parallelmagic'" | |
498 |
|
497 | |||
499 |
|
498 | |||
|
499 | @testdec.skip_doctest | |||
500 | class LoadBalancedView(View): |
|
500 | class LoadBalancedView(View): | |
501 | """An load-balancing View that only executes via the Task scheduler. |
|
501 | """An load-balancing View that only executes via the Task scheduler. | |
502 |
|
502 |
General Comments 0
You need to be logged in to leave comments.
Login now