##// END OF EJS Templates
Start each test with clear engine namespaces...
MinRK -
Show More
@@ -1,115 +1,117 b''
1 """base class for parallel client tests"""
1 """base class for parallel client tests"""
2
2
3 #-------------------------------------------------------------------------------
3 #-------------------------------------------------------------------------------
4 # Copyright (C) 2011 The IPython Development Team
4 # Copyright (C) 2011 The IPython Development Team
5 #
5 #
6 # Distributed under the terms of the BSD License. The full license is in
6 # Distributed under the terms of the BSD License. The full license is in
7 # the file COPYING, distributed as part of this software.
7 # the file COPYING, distributed as part of this software.
8 #-------------------------------------------------------------------------------
8 #-------------------------------------------------------------------------------
9
9
10 import sys
10 import sys
11 import tempfile
11 import tempfile
12 import time
12 import time
13
13
14 from nose import SkipTest
14 from nose import SkipTest
15
15
16 import zmq
16 import zmq
17 from zmq.tests import BaseZMQTestCase
17 from zmq.tests import BaseZMQTestCase
18
18
19 from IPython.external.decorator import decorator
19 from IPython.external.decorator import decorator
20
20
21 from IPython.parallel import error
21 from IPython.parallel import error
22 from IPython.parallel import Client
22 from IPython.parallel import Client
23 from IPython.parallel.tests import processes,add_engines
23 from IPython.parallel.tests import processes,add_engines
24
24
25 # simple tasks for use in apply tests
25 # simple tasks for use in apply tests
26
26
27 def segfault():
27 def segfault():
28 """this will segfault"""
28 """this will segfault"""
29 import ctypes
29 import ctypes
30 ctypes.memset(-1,0,1)
30 ctypes.memset(-1,0,1)
31
31
32 def wait(n):
32 def wait(n):
33 """sleep for a time"""
33 """sleep for a time"""
34 import time
34 import time
35 time.sleep(n)
35 time.sleep(n)
36 return n
36 return n
37
37
38 def raiser(eclass):
38 def raiser(eclass):
39 """raise an exception"""
39 """raise an exception"""
40 raise eclass()
40 raise eclass()
41
41
42 # test decorator for skipping tests when libraries are unavailable
42 # test decorator for skipping tests when libraries are unavailable
43 def skip_without(*names):
43 def skip_without(*names):
44 """skip a test if some names are not importable"""
44 """skip a test if some names are not importable"""
45 @decorator
45 @decorator
46 def skip_without_names(f, *args, **kwargs):
46 def skip_without_names(f, *args, **kwargs):
47 """decorator to skip tests in the absence of numpy."""
47 """decorator to skip tests in the absence of numpy."""
48 for name in names:
48 for name in names:
49 try:
49 try:
50 __import__(name)
50 __import__(name)
51 except ImportError:
51 except ImportError:
52 raise SkipTest
52 raise SkipTest
53 return f(*args, **kwargs)
53 return f(*args, **kwargs)
54 return skip_without_names
54 return skip_without_names
55
55
56 class ClusterTestCase(BaseZMQTestCase):
56 class ClusterTestCase(BaseZMQTestCase):
57
57
58 def add_engines(self, n=1, block=True):
58 def add_engines(self, n=1, block=True):
59 """add multiple engines to our cluster"""
59 """add multiple engines to our cluster"""
60 self.engines.extend(add_engines(n))
60 self.engines.extend(add_engines(n))
61 if block:
61 if block:
62 self.wait_on_engines()
62 self.wait_on_engines()
63
63
64 def wait_on_engines(self, timeout=5):
64 def wait_on_engines(self, timeout=5):
65 """wait for our engines to connect."""
65 """wait for our engines to connect."""
66 n = len(self.engines)+self.base_engine_count
66 n = len(self.engines)+self.base_engine_count
67 tic = time.time()
67 tic = time.time()
68 while time.time()-tic < timeout and len(self.client.ids) < n:
68 while time.time()-tic < timeout and len(self.client.ids) < n:
69 time.sleep(0.1)
69 time.sleep(0.1)
70
70
71 assert not len(self.client.ids) < n, "waiting for engines timed out"
71 assert not len(self.client.ids) < n, "waiting for engines timed out"
72
72
73 def connect_client(self):
73 def connect_client(self):
74 """connect a client with my Context, and track its sockets for cleanup"""
74 """connect a client with my Context, and track its sockets for cleanup"""
75 c = Client(profile='iptest', context=self.context)
75 c = Client(profile='iptest', context=self.context)
76 for name in filter(lambda n:n.endswith('socket'), dir(c)):
76 for name in filter(lambda n:n.endswith('socket'), dir(c)):
77 s = getattr(c, name)
77 s = getattr(c, name)
78 s.setsockopt(zmq.LINGER, 0)
78 s.setsockopt(zmq.LINGER, 0)
79 self.sockets.append(s)
79 self.sockets.append(s)
80 return c
80 return c
81
81
82 def assertRaisesRemote(self, etype, f, *args, **kwargs):
82 def assertRaisesRemote(self, etype, f, *args, **kwargs):
83 try:
83 try:
84 try:
84 try:
85 f(*args, **kwargs)
85 f(*args, **kwargs)
86 except error.CompositeError as e:
86 except error.CompositeError as e:
87 e.raise_exception()
87 e.raise_exception()
88 except error.RemoteError as e:
88 except error.RemoteError as e:
89 self.assertEquals(etype.__name__, e.ename, "Should have raised %r, but raised %r"%(e.ename, etype.__name__))
89 self.assertEquals(etype.__name__, e.ename, "Should have raised %r, but raised %r"%(e.ename, etype.__name__))
90 else:
90 else:
91 self.fail("should have raised a RemoteError")
91 self.fail("should have raised a RemoteError")
92
92
93 def setUp(self):
93 def setUp(self):
94 BaseZMQTestCase.setUp(self)
94 BaseZMQTestCase.setUp(self)
95 self.client = self.connect_client()
95 self.client = self.connect_client()
96 # start every test with clean engine namespaces:
97 self.client.clear(block=True)
96 self.base_engine_count=len(self.client.ids)
98 self.base_engine_count=len(self.client.ids)
97 self.engines=[]
99 self.engines=[]
98
100
99 def tearDown(self):
101 def tearDown(self):
100 # self.client.clear(block=True)
102 # self.client.clear(block=True)
101 # close fds:
103 # close fds:
102 for e in filter(lambda e: e.poll() is not None, processes):
104 for e in filter(lambda e: e.poll() is not None, processes):
103 processes.remove(e)
105 processes.remove(e)
104
106
105 # allow flushing of incoming messages to prevent crash on socket close
107 # allow flushing of incoming messages to prevent crash on socket close
106 self.client.wait(timeout=2)
108 self.client.wait(timeout=2)
107 # time.sleep(2)
109 # time.sleep(2)
108 self.client.spin()
110 self.client.spin()
109 self.client.close()
111 self.client.close()
110 BaseZMQTestCase.tearDown(self)
112 BaseZMQTestCase.tearDown(self)
111 # this will be redundant when pyzmq merges PR #88
113 # this will be redundant when pyzmq merges PR #88
112 # self.context.term()
114 # self.context.term()
113 # print tempfile.TemporaryFile().fileno(),
115 # print tempfile.TemporaryFile().fileno(),
114 # sys.stdout.flush()
116 # sys.stdout.flush()
115 No newline at end of file
117
@@ -1,148 +1,148 b''
1 """Tests for parallel client.py"""
1 """Tests for parallel client.py"""
2
2
3 #-------------------------------------------------------------------------------
3 #-------------------------------------------------------------------------------
4 # Copyright (C) 2011 The IPython Development Team
4 # Copyright (C) 2011 The IPython Development Team
5 #
5 #
6 # Distributed under the terms of the BSD License. The full license is in
6 # Distributed under the terms of the BSD License. The full license is in
7 # the file COPYING, distributed as part of this software.
7 # the file COPYING, distributed as part of this software.
8 #-------------------------------------------------------------------------------
8 #-------------------------------------------------------------------------------
9
9
10 #-------------------------------------------------------------------------------
10 #-------------------------------------------------------------------------------
11 # Imports
11 # Imports
12 #-------------------------------------------------------------------------------
12 #-------------------------------------------------------------------------------
13
13
14 import time
14 import time
15 from tempfile import mktemp
15 from tempfile import mktemp
16
16
17 import zmq
17 import zmq
18
18
19 from IPython.parallel.client import client as clientmod
19 from IPython.parallel.client import client as clientmod
20 from IPython.parallel import error
20 from IPython.parallel import error
21 from IPython.parallel import AsyncResult, AsyncHubResult
21 from IPython.parallel import AsyncResult, AsyncHubResult
22 from IPython.parallel import LoadBalancedView, DirectView
22 from IPython.parallel import LoadBalancedView, DirectView
23
23
24 from clienttest import ClusterTestCase, segfault, wait, add_engines
24 from clienttest import ClusterTestCase, segfault, wait, add_engines
25
25
26 def setup():
26 def setup():
27 add_engines(4)
27 add_engines(4)
28
28
29 class TestClient(ClusterTestCase):
29 class TestClient(ClusterTestCase):
30
30
31 def test_ids(self):
31 def test_ids(self):
32 n = len(self.client.ids)
32 n = len(self.client.ids)
33 self.add_engines(3)
33 self.add_engines(3)
34 self.assertEquals(len(self.client.ids), n+3)
34 self.assertEquals(len(self.client.ids), n+3)
35
35
36 def test_view_indexing(self):
36 def test_view_indexing(self):
37 """test index access for views"""
37 """test index access for views"""
38 self.add_engines(2)
38 self.add_engines(2)
39 targets = self.client._build_targets('all')[-1]
39 targets = self.client._build_targets('all')[-1]
40 v = self.client[:]
40 v = self.client[:]
41 self.assertEquals(v.targets, targets)
41 self.assertEquals(v.targets, targets)
42 t = self.client.ids[2]
42 t = self.client.ids[2]
43 v = self.client[t]
43 v = self.client[t]
44 self.assert_(isinstance(v, DirectView))
44 self.assert_(isinstance(v, DirectView))
45 self.assertEquals(v.targets, t)
45 self.assertEquals(v.targets, t)
46 t = self.client.ids[2:4]
46 t = self.client.ids[2:4]
47 v = self.client[t]
47 v = self.client[t]
48 self.assert_(isinstance(v, DirectView))
48 self.assert_(isinstance(v, DirectView))
49 self.assertEquals(v.targets, t)
49 self.assertEquals(v.targets, t)
50 v = self.client[::2]
50 v = self.client[::2]
51 self.assert_(isinstance(v, DirectView))
51 self.assert_(isinstance(v, DirectView))
52 self.assertEquals(v.targets, targets[::2])
52 self.assertEquals(v.targets, targets[::2])
53 v = self.client[1::3]
53 v = self.client[1::3]
54 self.assert_(isinstance(v, DirectView))
54 self.assert_(isinstance(v, DirectView))
55 self.assertEquals(v.targets, targets[1::3])
55 self.assertEquals(v.targets, targets[1::3])
56 v = self.client[:-3]
56 v = self.client[:-3]
57 self.assert_(isinstance(v, DirectView))
57 self.assert_(isinstance(v, DirectView))
58 self.assertEquals(v.targets, targets[:-3])
58 self.assertEquals(v.targets, targets[:-3])
59 v = self.client[-1]
59 v = self.client[-1]
60 self.assert_(isinstance(v, DirectView))
60 self.assert_(isinstance(v, DirectView))
61 self.assertEquals(v.targets, targets[-1])
61 self.assertEquals(v.targets, targets[-1])
62 self.assertRaises(TypeError, lambda : self.client[None])
62 self.assertRaises(TypeError, lambda : self.client[None])
63
63
64 def test_lbview_targets(self):
64 def test_lbview_targets(self):
65 """test load_balanced_view targets"""
65 """test load_balanced_view targets"""
66 v = self.client.load_balanced_view()
66 v = self.client.load_balanced_view()
67 self.assertEquals(v.targets, None)
67 self.assertEquals(v.targets, None)
68 v = self.client.load_balanced_view(-1)
68 v = self.client.load_balanced_view(-1)
69 self.assertEquals(v.targets, [self.client.ids[-1]])
69 self.assertEquals(v.targets, [self.client.ids[-1]])
70 v = self.client.load_balanced_view('all')
70 v = self.client.load_balanced_view('all')
71 self.assertEquals(v.targets, self.client.ids)
71 self.assertEquals(v.targets, self.client.ids)
72
72
73 def test_targets(self):
73 def test_targets(self):
74 """test various valid targets arguments"""
74 """test various valid targets arguments"""
75 build = self.client._build_targets
75 build = self.client._build_targets
76 ids = self.client.ids
76 ids = self.client.ids
77 idents,targets = build(None)
77 idents,targets = build(None)
78 self.assertEquals(ids, targets)
78 self.assertEquals(ids, targets)
79
79
80 def test_clear(self):
80 def test_clear(self):
81 """test clear behavior"""
81 """test clear behavior"""
82 # self.add_engines(2)
82 # self.add_engines(2)
83 v = self.client[:]
83 v = self.client[:]
84 v.block=True
84 v.block=True
85 v.push(dict(a=5))
85 v.push(dict(a=5))
86 v.pull('a')
86 v.pull('a')
87 id0 = self.client.ids[-1]
87 id0 = self.client.ids[-1]
88 self.client.clear(targets=id0)
88 self.client.clear(targets=id0, block=True)
89 self.client[:-1].pull('a')
89 a = self.client[:-1].get('a')
90 self.assertRaisesRemote(NameError, self.client[id0].get, 'a')
90 self.assertRaisesRemote(NameError, self.client[id0].get, 'a')
91 self.client.clear(block=True)
91 self.client.clear(block=True)
92 for i in self.client.ids:
92 for i in self.client.ids:
93 # print i
93 # print i
94 self.assertRaisesRemote(NameError, self.client[i].get, 'a')
94 self.assertRaisesRemote(NameError, self.client[i].get, 'a')
95
95
96 def test_get_result(self):
96 def test_get_result(self):
97 """test getting results from the Hub."""
97 """test getting results from the Hub."""
98 c = clientmod.Client(profile='iptest')
98 c = clientmod.Client(profile='iptest')
99 # self.add_engines(1)
99 # self.add_engines(1)
100 t = c.ids[-1]
100 t = c.ids[-1]
101 ar = c[t].apply_async(wait, 1)
101 ar = c[t].apply_async(wait, 1)
102 # give the monitor time to notice the message
102 # give the monitor time to notice the message
103 time.sleep(.25)
103 time.sleep(.25)
104 ahr = self.client.get_result(ar.msg_ids)
104 ahr = self.client.get_result(ar.msg_ids)
105 self.assertTrue(isinstance(ahr, AsyncHubResult))
105 self.assertTrue(isinstance(ahr, AsyncHubResult))
106 self.assertEquals(ahr.get(), ar.get())
106 self.assertEquals(ahr.get(), ar.get())
107 ar2 = self.client.get_result(ar.msg_ids)
107 ar2 = self.client.get_result(ar.msg_ids)
108 self.assertFalse(isinstance(ar2, AsyncHubResult))
108 self.assertFalse(isinstance(ar2, AsyncHubResult))
109 c.close()
109 c.close()
110
110
111 def test_ids_list(self):
111 def test_ids_list(self):
112 """test client.ids"""
112 """test client.ids"""
113 # self.add_engines(2)
113 # self.add_engines(2)
114 ids = self.client.ids
114 ids = self.client.ids
115 self.assertEquals(ids, self.client._ids)
115 self.assertEquals(ids, self.client._ids)
116 self.assertFalse(ids is self.client._ids)
116 self.assertFalse(ids is self.client._ids)
117 ids.remove(ids[-1])
117 ids.remove(ids[-1])
118 self.assertNotEquals(ids, self.client._ids)
118 self.assertNotEquals(ids, self.client._ids)
119
119
120 def test_queue_status(self):
120 def test_queue_status(self):
121 # self.addEngine(4)
121 # self.addEngine(4)
122 ids = self.client.ids
122 ids = self.client.ids
123 id0 = ids[0]
123 id0 = ids[0]
124 qs = self.client.queue_status(targets=id0)
124 qs = self.client.queue_status(targets=id0)
125 self.assertTrue(isinstance(qs, dict))
125 self.assertTrue(isinstance(qs, dict))
126 self.assertEquals(sorted(qs.keys()), ['completed', 'queue', 'tasks'])
126 self.assertEquals(sorted(qs.keys()), ['completed', 'queue', 'tasks'])
127 allqs = self.client.queue_status()
127 allqs = self.client.queue_status()
128 self.assertTrue(isinstance(allqs, dict))
128 self.assertTrue(isinstance(allqs, dict))
129 self.assertEquals(sorted(allqs.keys()), sorted(self.client.ids + ['unassigned']))
129 self.assertEquals(sorted(allqs.keys()), sorted(self.client.ids + ['unassigned']))
130 unassigned = allqs.pop('unassigned')
130 unassigned = allqs.pop('unassigned')
131 for eid,qs in allqs.items():
131 for eid,qs in allqs.items():
132 self.assertTrue(isinstance(qs, dict))
132 self.assertTrue(isinstance(qs, dict))
133 self.assertEquals(sorted(qs.keys()), ['completed', 'queue', 'tasks'])
133 self.assertEquals(sorted(qs.keys()), ['completed', 'queue', 'tasks'])
134
134
135 def test_shutdown(self):
135 def test_shutdown(self):
136 # self.addEngine(4)
136 # self.addEngine(4)
137 ids = self.client.ids
137 ids = self.client.ids
138 id0 = ids[0]
138 id0 = ids[0]
139 self.client.shutdown(id0, block=True)
139 self.client.shutdown(id0, block=True)
140 while id0 in self.client.ids:
140 while id0 in self.client.ids:
141 time.sleep(0.1)
141 time.sleep(0.1)
142 self.client.spin()
142 self.client.spin()
143
143
144 self.assertRaises(IndexError, lambda : self.client[id0])
144 self.assertRaises(IndexError, lambda : self.client[id0])
145
145
146 def test_result_status(self):
146 def test_result_status(self):
147 pass
147 pass
148 # to be written
148 # to be written
@@ -1,413 +1,414 b''
1 """test View objects"""
1 """test View objects"""
2 #-------------------------------------------------------------------------------
2 #-------------------------------------------------------------------------------
3 # Copyright (C) 2011 The IPython Development Team
3 # Copyright (C) 2011 The IPython Development Team
4 #
4 #
5 # Distributed under the terms of the BSD License. The full license is in
5 # Distributed under the terms of the BSD License. The full license is in
6 # the file COPYING, distributed as part of this software.
6 # the file COPYING, distributed as part of this software.
7 #-------------------------------------------------------------------------------
7 #-------------------------------------------------------------------------------
8
8
9 #-------------------------------------------------------------------------------
9 #-------------------------------------------------------------------------------
10 # Imports
10 # Imports
11 #-------------------------------------------------------------------------------
11 #-------------------------------------------------------------------------------
12
12
13 import sys
13 import sys
14 import time
14 import time
15 from tempfile import mktemp
15 from tempfile import mktemp
16 from StringIO import StringIO
16 from StringIO import StringIO
17
17
18 import zmq
18 import zmq
19
19
20 from IPython import parallel as pmod
20 from IPython import parallel as pmod
21 from IPython.parallel import error
21 from IPython.parallel import error
22 from IPython.parallel import AsyncResult, AsyncHubResult, AsyncMapResult
22 from IPython.parallel import AsyncResult, AsyncHubResult, AsyncMapResult
23 from IPython.parallel import LoadBalancedView, DirectView
23 from IPython.parallel import LoadBalancedView, DirectView
24 from IPython.parallel.util import interactive
24 from IPython.parallel.util import interactive
25
25
26 from IPython.parallel.tests import add_engines
26 from IPython.parallel.tests import add_engines
27
27
28 from .clienttest import ClusterTestCase, segfault, wait, skip_without
28 from .clienttest import ClusterTestCase, segfault, wait, skip_without
29
29
30 def setup():
30 def setup():
31 add_engines(3)
31 add_engines(3)
32
32
33 class TestView(ClusterTestCase):
33 class TestView(ClusterTestCase):
34
34
35 def test_segfault_task(self):
35 def test_segfault_task(self):
36 """test graceful handling of engine death (balanced)"""
36 """test graceful handling of engine death (balanced)"""
37 # self.add_engines(1)
37 # self.add_engines(1)
38 ar = self.client[-1].apply_async(segfault)
38 ar = self.client[-1].apply_async(segfault)
39 self.assertRaisesRemote(error.EngineError, ar.get)
39 self.assertRaisesRemote(error.EngineError, ar.get)
40 eid = ar.engine_id
40 eid = ar.engine_id
41 while eid in self.client.ids:
41 while eid in self.client.ids:
42 time.sleep(.01)
42 time.sleep(.01)
43 self.client.spin()
43 self.client.spin()
44
44
45 def test_segfault_mux(self):
45 def test_segfault_mux(self):
46 """test graceful handling of engine death (direct)"""
46 """test graceful handling of engine death (direct)"""
47 # self.add_engines(1)
47 # self.add_engines(1)
48 eid = self.client.ids[-1]
48 eid = self.client.ids[-1]
49 ar = self.client[eid].apply_async(segfault)
49 ar = self.client[eid].apply_async(segfault)
50 self.assertRaisesRemote(error.EngineError, ar.get)
50 self.assertRaisesRemote(error.EngineError, ar.get)
51 eid = ar.engine_id
51 eid = ar.engine_id
52 while eid in self.client.ids:
52 while eid in self.client.ids:
53 time.sleep(.01)
53 time.sleep(.01)
54 self.client.spin()
54 self.client.spin()
55
55
56 def test_push_pull(self):
56 def test_push_pull(self):
57 """test pushing and pulling"""
57 """test pushing and pulling"""
58 data = dict(a=10, b=1.05, c=range(10), d={'e':(1,2),'f':'hi'})
58 data = dict(a=10, b=1.05, c=range(10), d={'e':(1,2),'f':'hi'})
59 t = self.client.ids[-1]
59 t = self.client.ids[-1]
60 v = self.client[t]
60 v = self.client[t]
61 push = v.push
61 push = v.push
62 pull = v.pull
62 pull = v.pull
63 v.block=True
63 v.block=True
64 nengines = len(self.client)
64 nengines = len(self.client)
65 push({'data':data})
65 push({'data':data})
66 d = pull('data')
66 d = pull('data')
67 self.assertEquals(d, data)
67 self.assertEquals(d, data)
68 self.client[:].push({'data':data})
68 self.client[:].push({'data':data})
69 d = self.client[:].pull('data', block=True)
69 d = self.client[:].pull('data', block=True)
70 self.assertEquals(d, nengines*[data])
70 self.assertEquals(d, nengines*[data])
71 ar = push({'data':data}, block=False)
71 ar = push({'data':data}, block=False)
72 self.assertTrue(isinstance(ar, AsyncResult))
72 self.assertTrue(isinstance(ar, AsyncResult))
73 r = ar.get()
73 r = ar.get()
74 ar = self.client[:].pull('data', block=False)
74 ar = self.client[:].pull('data', block=False)
75 self.assertTrue(isinstance(ar, AsyncResult))
75 self.assertTrue(isinstance(ar, AsyncResult))
76 r = ar.get()
76 r = ar.get()
77 self.assertEquals(r, nengines*[data])
77 self.assertEquals(r, nengines*[data])
78 self.client[:].push(dict(a=10,b=20))
78 self.client[:].push(dict(a=10,b=20))
79 r = self.client[:].pull(('a','b'), block=True)
79 r = self.client[:].pull(('a','b'), block=True)
80 self.assertEquals(r, nengines*[[10,20]])
80 self.assertEquals(r, nengines*[[10,20]])
81
81
82 def test_push_pull_function(self):
82 def test_push_pull_function(self):
83 "test pushing and pulling functions"
83 "test pushing and pulling functions"
84 def testf(x):
84 def testf(x):
85 return 2.0*x
85 return 2.0*x
86
86
87 t = self.client.ids[-1]
87 t = self.client.ids[-1]
88 v = self.client[t]
88 v = self.client[t]
89 v.block=True
89 v.block=True
90 push = v.push
90 push = v.push
91 pull = v.pull
91 pull = v.pull
92 execute = v.execute
92 execute = v.execute
93 push({'testf':testf})
93 push({'testf':testf})
94 r = pull('testf')
94 r = pull('testf')
95 self.assertEqual(r(1.0), testf(1.0))
95 self.assertEqual(r(1.0), testf(1.0))
96 execute('r = testf(10)')
96 execute('r = testf(10)')
97 r = pull('r')
97 r = pull('r')
98 self.assertEquals(r, testf(10))
98 self.assertEquals(r, testf(10))
99 ar = self.client[:].push({'testf':testf}, block=False)
99 ar = self.client[:].push({'testf':testf}, block=False)
100 ar.get()
100 ar.get()
101 ar = self.client[:].pull('testf', block=False)
101 ar = self.client[:].pull('testf', block=False)
102 rlist = ar.get()
102 rlist = ar.get()
103 for r in rlist:
103 for r in rlist:
104 self.assertEqual(r(1.0), testf(1.0))
104 self.assertEqual(r(1.0), testf(1.0))
105 execute("def g(x): return x*x")
105 execute("def g(x): return x*x")
106 r = pull(('testf','g'))
106 r = pull(('testf','g'))
107 self.assertEquals((r[0](10),r[1](10)), (testf(10), 100))
107 self.assertEquals((r[0](10),r[1](10)), (testf(10), 100))
108
108
109 def test_push_function_globals(self):
109 def test_push_function_globals(self):
110 """test that pushed functions have access to globals"""
110 """test that pushed functions have access to globals"""
111 @interactive
111 @interactive
112 def geta():
112 def geta():
113 return a
113 return a
114 # self.add_engines(1)
114 # self.add_engines(1)
115 v = self.client[-1]
115 v = self.client[-1]
116 v.block=True
116 v.block=True
117 v['f'] = geta
117 v['f'] = geta
118 self.assertRaisesRemote(NameError, v.execute, 'b=f()')
118 self.assertRaisesRemote(NameError, v.execute, 'b=f()')
119 v.execute('a=5')
119 v.execute('a=5')
120 v.execute('b=f()')
120 v.execute('b=f()')
121 self.assertEquals(v['b'], 5)
121 self.assertEquals(v['b'], 5)
122
122
123 def test_push_function_defaults(self):
123 def test_push_function_defaults(self):
124 """test that pushed functions preserve default args"""
124 """test that pushed functions preserve default args"""
125 def echo(a=10):
125 def echo(a=10):
126 return a
126 return a
127 v = self.client[-1]
127 v = self.client[-1]
128 v.block=True
128 v.block=True
129 v['f'] = echo
129 v['f'] = echo
130 v.execute('b=f()')
130 v.execute('b=f()')
131 self.assertEquals(v['b'], 10)
131 self.assertEquals(v['b'], 10)
132
132
133 def test_get_result(self):
133 def test_get_result(self):
134 """test getting results from the Hub."""
134 """test getting results from the Hub."""
135 c = pmod.Client(profile='iptest')
135 c = pmod.Client(profile='iptest')
136 # self.add_engines(1)
136 # self.add_engines(1)
137 t = c.ids[-1]
137 t = c.ids[-1]
138 v = c[t]
138 v = c[t]
139 v2 = self.client[t]
139 v2 = self.client[t]
140 ar = v.apply_async(wait, 1)
140 ar = v.apply_async(wait, 1)
141 # give the monitor time to notice the message
141 # give the monitor time to notice the message
142 time.sleep(.25)
142 time.sleep(.25)
143 ahr = v2.get_result(ar.msg_ids)
143 ahr = v2.get_result(ar.msg_ids)
144 self.assertTrue(isinstance(ahr, AsyncHubResult))
144 self.assertTrue(isinstance(ahr, AsyncHubResult))
145 self.assertEquals(ahr.get(), ar.get())
145 self.assertEquals(ahr.get(), ar.get())
146 ar2 = v2.get_result(ar.msg_ids)
146 ar2 = v2.get_result(ar.msg_ids)
147 self.assertFalse(isinstance(ar2, AsyncHubResult))
147 self.assertFalse(isinstance(ar2, AsyncHubResult))
148 c.spin()
148 c.spin()
149 c.close()
149 c.close()
150
150
151 def test_run_newline(self):
151 def test_run_newline(self):
152 """test that run appends newline to files"""
152 """test that run appends newline to files"""
153 tmpfile = mktemp()
153 tmpfile = mktemp()
154 with open(tmpfile, 'w') as f:
154 with open(tmpfile, 'w') as f:
155 f.write("""def g():
155 f.write("""def g():
156 return 5
156 return 5
157 """)
157 """)
158 v = self.client[-1]
158 v = self.client[-1]
159 v.run(tmpfile, block=True)
159 v.run(tmpfile, block=True)
160 self.assertEquals(v.apply_sync(lambda f: f(), pmod.Reference('g')), 5)
160 self.assertEquals(v.apply_sync(lambda f: f(), pmod.Reference('g')), 5)
161
161
162 def test_apply_tracked(self):
162 def test_apply_tracked(self):
163 """test tracking for apply"""
163 """test tracking for apply"""
164 # self.add_engines(1)
164 # self.add_engines(1)
165 t = self.client.ids[-1]
165 t = self.client.ids[-1]
166 v = self.client[t]
166 v = self.client[t]
167 v.block=False
167 v.block=False
168 def echo(n=1024*1024, **kwargs):
168 def echo(n=1024*1024, **kwargs):
169 with v.temp_flags(**kwargs):
169 with v.temp_flags(**kwargs):
170 return v.apply(lambda x: x, 'x'*n)
170 return v.apply(lambda x: x, 'x'*n)
171 ar = echo(1, track=False)
171 ar = echo(1, track=False)
172 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
172 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
173 self.assertTrue(ar.sent)
173 self.assertTrue(ar.sent)
174 ar = echo(track=True)
174 ar = echo(track=True)
175 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
175 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
176 self.assertEquals(ar.sent, ar._tracker.done)
176 self.assertEquals(ar.sent, ar._tracker.done)
177 ar._tracker.wait()
177 ar._tracker.wait()
178 self.assertTrue(ar.sent)
178 self.assertTrue(ar.sent)
179
179
180 def test_push_tracked(self):
180 def test_push_tracked(self):
181 t = self.client.ids[-1]
181 t = self.client.ids[-1]
182 ns = dict(x='x'*1024*1024)
182 ns = dict(x='x'*1024*1024)
183 v = self.client[t]
183 v = self.client[t]
184 ar = v.push(ns, block=False, track=False)
184 ar = v.push(ns, block=False, track=False)
185 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
185 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
186 self.assertTrue(ar.sent)
186 self.assertTrue(ar.sent)
187
187
188 ar = v.push(ns, block=False, track=True)
188 ar = v.push(ns, block=False, track=True)
189 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
189 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
190 self.assertEquals(ar.sent, ar._tracker.done)
190 self.assertEquals(ar.sent, ar._tracker.done)
191 ar._tracker.wait()
191 ar._tracker.wait()
192 self.assertTrue(ar.sent)
192 self.assertTrue(ar.sent)
193 ar.get()
193 ar.get()
194
194
195 def test_scatter_tracked(self):
195 def test_scatter_tracked(self):
196 t = self.client.ids
196 t = self.client.ids
197 x='x'*1024*1024
197 x='x'*1024*1024
198 ar = self.client[t].scatter('x', x, block=False, track=False)
198 ar = self.client[t].scatter('x', x, block=False, track=False)
199 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
199 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
200 self.assertTrue(ar.sent)
200 self.assertTrue(ar.sent)
201
201
202 ar = self.client[t].scatter('x', x, block=False, track=True)
202 ar = self.client[t].scatter('x', x, block=False, track=True)
203 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
203 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
204 self.assertEquals(ar.sent, ar._tracker.done)
204 self.assertEquals(ar.sent, ar._tracker.done)
205 ar._tracker.wait()
205 ar._tracker.wait()
206 self.assertTrue(ar.sent)
206 self.assertTrue(ar.sent)
207 ar.get()
207 ar.get()
208
208
209 def test_remote_reference(self):
209 def test_remote_reference(self):
210 v = self.client[-1]
210 v = self.client[-1]
211 v['a'] = 123
211 v['a'] = 123
212 ra = pmod.Reference('a')
212 ra = pmod.Reference('a')
213 b = v.apply_sync(lambda x: x, ra)
213 b = v.apply_sync(lambda x: x, ra)
214 self.assertEquals(b, 123)
214 self.assertEquals(b, 123)
215
215
216
216
217 def test_scatter_gather(self):
217 def test_scatter_gather(self):
218 view = self.client[:]
218 view = self.client[:]
219 seq1 = range(16)
219 seq1 = range(16)
220 view.scatter('a', seq1)
220 view.scatter('a', seq1)
221 seq2 = view.gather('a', block=True)
221 seq2 = view.gather('a', block=True)
222 self.assertEquals(seq2, seq1)
222 self.assertEquals(seq2, seq1)
223 self.assertRaisesRemote(NameError, view.gather, 'asdf', block=True)
223 self.assertRaisesRemote(NameError, view.gather, 'asdf', block=True)
224
224
225 @skip_without('numpy')
225 @skip_without('numpy')
226 def test_scatter_gather_numpy(self):
226 def test_scatter_gather_numpy(self):
227 import numpy
227 import numpy
228 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
228 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
229 view = self.client[:]
229 view = self.client[:]
230 a = numpy.arange(64)
230 a = numpy.arange(64)
231 view.scatter('a', a)
231 view.scatter('a', a)
232 b = view.gather('a', block=True)
232 b = view.gather('a', block=True)
233 assert_array_equal(b, a)
233 assert_array_equal(b, a)
234
234
235 def test_map(self):
235 def test_map(self):
236 view = self.client[:]
236 view = self.client[:]
237 def f(x):
237 def f(x):
238 return x**2
238 return x**2
239 data = range(16)
239 data = range(16)
240 r = view.map_sync(f, data)
240 r = view.map_sync(f, data)
241 self.assertEquals(r, map(f, data))
241 self.assertEquals(r, map(f, data))
242
242
243 def test_scatterGatherNonblocking(self):
243 def test_scatterGatherNonblocking(self):
244 data = range(16)
244 data = range(16)
245 view = self.client[:]
245 view = self.client[:]
246 view.scatter('a', data, block=False)
246 view.scatter('a', data, block=False)
247 ar = view.gather('a', block=False)
247 ar = view.gather('a', block=False)
248 self.assertEquals(ar.get(), data)
248 self.assertEquals(ar.get(), data)
249
249
250 @skip_without('numpy')
250 @skip_without('numpy')
251 def test_scatter_gather_numpy_nonblocking(self):
251 def test_scatter_gather_numpy_nonblocking(self):
252 import numpy
252 import numpy
253 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
253 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
254 a = numpy.arange(64)
254 a = numpy.arange(64)
255 view = self.client[:]
255 view = self.client[:]
256 ar = view.scatter('a', a, block=False)
256 ar = view.scatter('a', a, block=False)
257 self.assertTrue(isinstance(ar, AsyncResult))
257 self.assertTrue(isinstance(ar, AsyncResult))
258 amr = view.gather('a', block=False)
258 amr = view.gather('a', block=False)
259 self.assertTrue(isinstance(amr, AsyncMapResult))
259 self.assertTrue(isinstance(amr, AsyncMapResult))
260 assert_array_equal(amr.get(), a)
260 assert_array_equal(amr.get(), a)
261
261
262 def test_execute(self):
262 def test_execute(self):
263 view = self.client[:]
263 view = self.client[:]
264 # self.client.debug=True
264 # self.client.debug=True
265 execute = view.execute
265 execute = view.execute
266 ar = execute('c=30', block=False)
266 ar = execute('c=30', block=False)
267 self.assertTrue(isinstance(ar, AsyncResult))
267 self.assertTrue(isinstance(ar, AsyncResult))
268 ar = execute('d=[0,1,2]', block=False)
268 ar = execute('d=[0,1,2]', block=False)
269 self.client.wait(ar, 1)
269 self.client.wait(ar, 1)
270 self.assertEquals(len(ar.get()), len(self.client))
270 self.assertEquals(len(ar.get()), len(self.client))
271 for c in view['c']:
271 for c in view['c']:
272 self.assertEquals(c, 30)
272 self.assertEquals(c, 30)
273
273
274 def test_abort(self):
274 def test_abort(self):
275 view = self.client[-1]
275 view = self.client[-1]
276 ar = view.execute('import time; time.sleep(0.25)', block=False)
276 ar = view.execute('import time; time.sleep(0.25)', block=False)
277 ar2 = view.apply_async(lambda : 2)
277 ar2 = view.apply_async(lambda : 2)
278 ar3 = view.apply_async(lambda : 3)
278 ar3 = view.apply_async(lambda : 3)
279 view.abort(ar2)
279 view.abort(ar2)
280 view.abort(ar3.msg_ids)
280 view.abort(ar3.msg_ids)
281 self.assertRaises(error.TaskAborted, ar2.get)
281 self.assertRaises(error.TaskAborted, ar2.get)
282 self.assertRaises(error.TaskAborted, ar3.get)
282 self.assertRaises(error.TaskAborted, ar3.get)
283
283
284 def test_temp_flags(self):
284 def test_temp_flags(self):
285 view = self.client[-1]
285 view = self.client[-1]
286 view.block=True
286 view.block=True
287 with view.temp_flags(block=False):
287 with view.temp_flags(block=False):
288 self.assertFalse(view.block)
288 self.assertFalse(view.block)
289 self.assertTrue(view.block)
289 self.assertTrue(view.block)
290
290
291 def test_importer(self):
291 def test_importer(self):
292 view = self.client[-1]
292 view = self.client[-1]
293 view.clear(block=True)
293 view.clear(block=True)
294 with view.importer:
294 with view.importer:
295 import re
295 import re
296
296
297 @interactive
297 @interactive
298 def findall(pat, s):
298 def findall(pat, s):
299 # this globals() step isn't necessary in real code
299 # this globals() step isn't necessary in real code
300 # only to prevent a closure in the test
300 # only to prevent a closure in the test
301 return globals()['re'].findall(pat, s)
301 re = globals()['re']
302 return re.findall(pat, s)
302
303
303 self.assertEquals(view.apply_sync(findall, '\w+', 'hello world'), 'hello world'.split())
304 self.assertEquals(view.apply_sync(findall, '\w+', 'hello world'), 'hello world'.split())
304
305
305 # parallel magic tests
306 # parallel magic tests
306
307
307 def test_magic_px_blocking(self):
308 def test_magic_px_blocking(self):
308 ip = get_ipython()
309 ip = get_ipython()
309 v = self.client[-1]
310 v = self.client[-1]
310 v.activate()
311 v.activate()
311 v.block=True
312 v.block=True
312
313
313 ip.magic_px('a=5')
314 ip.magic_px('a=5')
314 self.assertEquals(v['a'], 5)
315 self.assertEquals(v['a'], 5)
315 ip.magic_px('a=10')
316 ip.magic_px('a=10')
316 self.assertEquals(v['a'], 10)
317 self.assertEquals(v['a'], 10)
317 sio = StringIO()
318 sio = StringIO()
318 savestdout = sys.stdout
319 savestdout = sys.stdout
319 sys.stdout = sio
320 sys.stdout = sio
320 ip.magic_px('print a')
321 ip.magic_px('print a')
321 sys.stdout = savestdout
322 sys.stdout = savestdout
322 sio.read()
323 sio.read()
323 self.assertTrue('[stdout:%i]'%v.targets in sio.buf)
324 self.assertTrue('[stdout:%i]'%v.targets in sio.buf)
324 self.assertRaisesRemote(ZeroDivisionError, ip.magic_px, '1/0')
325 self.assertRaisesRemote(ZeroDivisionError, ip.magic_px, '1/0')
325
326
326 def test_magic_px_nonblocking(self):
327 def test_magic_px_nonblocking(self):
327 ip = get_ipython()
328 ip = get_ipython()
328 v = self.client[-1]
329 v = self.client[-1]
329 v.activate()
330 v.activate()
330 v.block=False
331 v.block=False
331
332
332 ip.magic_px('a=5')
333 ip.magic_px('a=5')
333 self.assertEquals(v['a'], 5)
334 self.assertEquals(v['a'], 5)
334 ip.magic_px('a=10')
335 ip.magic_px('a=10')
335 self.assertEquals(v['a'], 10)
336 self.assertEquals(v['a'], 10)
336 sio = StringIO()
337 sio = StringIO()
337 savestdout = sys.stdout
338 savestdout = sys.stdout
338 sys.stdout = sio
339 sys.stdout = sio
339 ip.magic_px('print a')
340 ip.magic_px('print a')
340 sys.stdout = savestdout
341 sys.stdout = savestdout
341 sio.read()
342 sio.read()
342 self.assertFalse('[stdout:%i]'%v.targets in sio.buf)
343 self.assertFalse('[stdout:%i]'%v.targets in sio.buf)
343 ip.magic_px('1/0')
344 ip.magic_px('1/0')
344 ar = v.get_result(-1)
345 ar = v.get_result(-1)
345 self.assertRaisesRemote(ZeroDivisionError, ar.get)
346 self.assertRaisesRemote(ZeroDivisionError, ar.get)
346
347
347 def test_magic_autopx_blocking(self):
348 def test_magic_autopx_blocking(self):
348 ip = get_ipython()
349 ip = get_ipython()
349 v = self.client[-1]
350 v = self.client[-1]
350 v.activate()
351 v.activate()
351 v.block=True
352 v.block=True
352
353
353 sio = StringIO()
354 sio = StringIO()
354 savestdout = sys.stdout
355 savestdout = sys.stdout
355 sys.stdout = sio
356 sys.stdout = sio
356 ip.magic_autopx()
357 ip.magic_autopx()
357 ip.run_cell('\n'.join(('a=5','b=10','c=0')))
358 ip.run_cell('\n'.join(('a=5','b=10','c=0')))
358 ip.run_cell('print b')
359 ip.run_cell('print b')
359 ip.run_cell("b/c")
360 ip.run_cell("b/c")
360 ip.run_code(compile('b*=2', '', 'single'))
361 ip.run_code(compile('b*=2', '', 'single'))
361 ip.magic_autopx()
362 ip.magic_autopx()
362 sys.stdout = savestdout
363 sys.stdout = savestdout
363 sio.read()
364 sio.read()
364 output = sio.buf.strip()
365 output = sio.buf.strip()
365 self.assertTrue(output.startswith('%autopx enabled'))
366 self.assertTrue(output.startswith('%autopx enabled'))
366 self.assertTrue(output.endswith('%autopx disabled'))
367 self.assertTrue(output.endswith('%autopx disabled'))
367 self.assertTrue('RemoteError: ZeroDivisionError' in output)
368 self.assertTrue('RemoteError: ZeroDivisionError' in output)
368 ar = v.get_result(-2)
369 ar = v.get_result(-2)
369 self.assertEquals(v['a'], 5)
370 self.assertEquals(v['a'], 5)
370 self.assertEquals(v['b'], 20)
371 self.assertEquals(v['b'], 20)
371 self.assertRaisesRemote(ZeroDivisionError, ar.get)
372 self.assertRaisesRemote(ZeroDivisionError, ar.get)
372
373
373 def test_magic_autopx_nonblocking(self):
374 def test_magic_autopx_nonblocking(self):
374 ip = get_ipython()
375 ip = get_ipython()
375 v = self.client[-1]
376 v = self.client[-1]
376 v.activate()
377 v.activate()
377 v.block=False
378 v.block=False
378
379
379 sio = StringIO()
380 sio = StringIO()
380 savestdout = sys.stdout
381 savestdout = sys.stdout
381 sys.stdout = sio
382 sys.stdout = sio
382 ip.magic_autopx()
383 ip.magic_autopx()
383 ip.run_cell('\n'.join(('a=5','b=10','c=0')))
384 ip.run_cell('\n'.join(('a=5','b=10','c=0')))
384 ip.run_cell('print b')
385 ip.run_cell('print b')
385 ip.run_cell("b/c")
386 ip.run_cell("b/c")
386 ip.run_code(compile('b*=2', '', 'single'))
387 ip.run_code(compile('b*=2', '', 'single'))
387 ip.magic_autopx()
388 ip.magic_autopx()
388 sys.stdout = savestdout
389 sys.stdout = savestdout
389 sio.read()
390 sio.read()
390 output = sio.buf.strip()
391 output = sio.buf.strip()
391 self.assertTrue(output.startswith('%autopx enabled'))
392 self.assertTrue(output.startswith('%autopx enabled'))
392 self.assertTrue(output.endswith('%autopx disabled'))
393 self.assertTrue(output.endswith('%autopx disabled'))
393 self.assertFalse('ZeroDivisionError' in output)
394 self.assertFalse('ZeroDivisionError' in output)
394 ar = v.get_result(-2)
395 ar = v.get_result(-2)
395 self.assertEquals(v['a'], 5)
396 self.assertEquals(v['a'], 5)
396 self.assertEquals(v['b'], 20)
397 self.assertEquals(v['b'], 20)
397 self.assertRaisesRemote(ZeroDivisionError, ar.get)
398 self.assertRaisesRemote(ZeroDivisionError, ar.get)
398
399
399 def test_magic_result(self):
400 def test_magic_result(self):
400 ip = get_ipython()
401 ip = get_ipython()
401 v = self.client[-1]
402 v = self.client[-1]
402 v.activate()
403 v.activate()
403 v['a'] = 111
404 v['a'] = 111
404 ra = v['a']
405 ra = v['a']
405
406
406 ar = ip.magic_result()
407 ar = ip.magic_result()
407 self.assertEquals(ar.msg_ids, [v.history[-1]])
408 self.assertEquals(ar.msg_ids, [v.history[-1]])
408 self.assertEquals(ar.get(), 111)
409 self.assertEquals(ar.get(), 111)
409 ar = ip.magic_result('-2')
410 ar = ip.magic_result('-2')
410 self.assertEquals(ar.msg_ids, [v.history[-2]])
411 self.assertEquals(ar.msg_ids, [v.history[-2]])
411
412
412
413
413
414
General Comments 0
You need to be logged in to leave comments. Login now