##// END OF EJS Templates
expedite IPython.parallel tests...
MinRK -
Show More
@@ -1,111 +1,120 b''
1 1 """toplevel setup/teardown for parallel tests."""
2 2
3 3 #-------------------------------------------------------------------------------
4 4 # Copyright (C) 2011 The IPython Development Team
5 5 #
6 6 # Distributed under the terms of the BSD License. The full license is in
7 7 # the file COPYING, distributed as part of this software.
8 8 #-------------------------------------------------------------------------------
9 9
10 10 #-------------------------------------------------------------------------------
11 11 # Imports
12 12 #-------------------------------------------------------------------------------
13 13
14 14 import os
15 15 import tempfile
16 16 import time
17 17 from subprocess import Popen
18 18
19 19 from IPython.utils.path import get_ipython_dir
20 20 from IPython.parallel import Client
21 21 from IPython.parallel.apps.launcher import (LocalProcessLauncher,
22 22 ipengine_cmd_argv,
23 23 ipcontroller_cmd_argv,
24 24 SIGKILL)
25 25
26 26 # globals
27 27 launchers = []
28 28 blackhole = open(os.devnull, 'w')
29 29
30 30 # Launcher class
31 31 class TestProcessLauncher(LocalProcessLauncher):
32 32 """subclass LocalProcessLauncher, to prevent extra sockets and threads being created on Windows"""
33 33 def start(self):
34 34 if self.state == 'before':
35 35 self.process = Popen(self.args,
36 36 stdout=blackhole, stderr=blackhole,
37 37 env=os.environ,
38 38 cwd=self.work_dir
39 39 )
40 40 self.notify_start(self.process.pid)
41 41 self.poll = self.process.poll
42 42 else:
43 43 s = 'The process was already started and has state: %r' % self.state
44 44 raise ProcessStateError(s)
45 45
46 46 # nose setup/teardown
47 47
48 48 def setup():
49 49 cluster_dir = os.path.join(get_ipython_dir(), 'profile_iptest')
50 50 engine_json = os.path.join(cluster_dir, 'security', 'ipcontroller-engine.json')
51 51 client_json = os.path.join(cluster_dir, 'security', 'ipcontroller-client.json')
52 52 for json in (engine_json, client_json):
53 53 if os.path.exists(json):
54 54 os.remove(json)
55 55
56 56 cp = TestProcessLauncher()
57 57 cp.cmd_and_args = ipcontroller_cmd_argv + \
58 58 ['--profile=iptest', '--log-level=50', '--ping=250']
59 59 cp.start()
60 60 launchers.append(cp)
61 61 tic = time.time()
62 62 while not os.path.exists(engine_json) or not os.path.exists(client_json):
63 63 if cp.poll() is not None:
64 64 print cp.poll()
65 65 raise RuntimeError("The test controller failed to start.")
66 66 elif time.time()-tic > 10:
67 67 raise RuntimeError("Timeout waiting for the test controller to start.")
68 68 time.sleep(0.1)
69 69 add_engines(1)
70 70
71 def add_engines(n=1, profile='iptest'):
71 def add_engines(n=1, profile='iptest', total=False):
72 """add a number of engines to a given profile.
73
74 If total is True, then already running engines are counted, and only
75 the additional engines necessary (if any) are started.
76 """
72 77 rc = Client(profile=profile)
73 78 base = len(rc)
79
80 if total:
81 n = max(n - base, 0)
82
74 83 eps = []
75 84 for i in range(n):
76 85 ep = TestProcessLauncher()
77 86 ep.cmd_and_args = ipengine_cmd_argv + ['--profile=%s'%profile, '--log-level=50']
78 87 ep.start()
79 88 launchers.append(ep)
80 89 eps.append(ep)
81 90 tic = time.time()
82 91 while len(rc) < base+n:
83 92 if any([ ep.poll() is not None for ep in eps ]):
84 93 raise RuntimeError("A test engine failed to start.")
85 94 elif time.time()-tic > 10:
86 95 raise RuntimeError("Timeout waiting for engines to connect.")
87 96 time.sleep(.1)
88 97 rc.spin()
89 98 rc.close()
90 99 return eps
91 100
92 101 def teardown():
93 102 time.sleep(1)
94 103 while launchers:
95 104 p = launchers.pop()
96 105 if p.poll() is None:
97 106 try:
98 107 p.stop()
99 108 except Exception, e:
100 109 print e
101 110 pass
102 111 if p.poll() is None:
103 112 time.sleep(.25)
104 113 if p.poll() is None:
105 114 try:
106 115 print 'cleaning up test process...'
107 116 p.signal(SIGKILL)
108 117 except:
109 118 print "couldn't shutdown process: ", p
110 119 blackhole.close()
111 120
@@ -1,137 +1,144 b''
1 1 """base class for parallel client tests
2 2
3 3 Authors:
4 4
5 5 * Min RK
6 6 """
7 7
8 8 #-------------------------------------------------------------------------------
9 9 # Copyright (C) 2011 The IPython Development Team
10 10 #
11 11 # Distributed under the terms of the BSD License. The full license is in
12 12 # the file COPYING, distributed as part of this software.
13 13 #-------------------------------------------------------------------------------
14 14
15 15 import sys
16 16 import tempfile
17 17 import time
18 18
19 19 from nose import SkipTest
20 20
21 21 import zmq
22 22 from zmq.tests import BaseZMQTestCase
23 23
24 24 from IPython.external.decorator import decorator
25 25
26 26 from IPython.parallel import error
27 27 from IPython.parallel import Client
28 28
29 29 from IPython.parallel.tests import launchers, add_engines
30 30
31 31 # simple tasks for use in apply tests
32 32
33 33 def segfault():
34 34 """this will segfault"""
35 35 import ctypes
36 36 ctypes.memset(-1,0,1)
37 37
38 38 def crash():
39 39 """from stdlib crashers in the test suite"""
40 40 import types
41 41 if sys.platform.startswith('win'):
42 42 import ctypes
43 43 ctypes.windll.kernel32.SetErrorMode(0x0002);
44 44 args = [ 0, 0, 0, 0, b'\x04\x71\x00\x00', (), (), (), '', '', 1, b'']
45 45 if sys.version_info[0] >= 3:
46 46 # Python3 adds 'kwonlyargcount' as the second argument to Code
47 47 args.insert(1, 0)
48 48
49 49 co = types.CodeType(*args)
50 50 exec(co)
51 51
52 52 def wait(n):
53 53 """sleep for a time"""
54 54 import time
55 55 time.sleep(n)
56 56 return n
57 57
58 58 def raiser(eclass):
59 59 """raise an exception"""
60 60 raise eclass()
61 61
62 62 # test decorator for skipping tests when libraries are unavailable
63 63 def skip_without(*names):
64 64 """skip a test if some names are not importable"""
65 65 @decorator
66 66 def skip_without_names(f, *args, **kwargs):
67 67 """decorator to skip tests in the absence of numpy."""
68 68 for name in names:
69 69 try:
70 70 __import__(name)
71 71 except ImportError:
72 72 raise SkipTest
73 73 return f(*args, **kwargs)
74 74 return skip_without_names
75 75
76 76 class ClusterTestCase(BaseZMQTestCase):
77 77
78 78 def add_engines(self, n=1, block=True):
79 79 """add multiple engines to our cluster"""
80 80 self.engines.extend(add_engines(n))
81 81 if block:
82 82 self.wait_on_engines()
83
84 def minimum_engines(self, n=1, block=True):
85 """add engines until there are at least n connected"""
86 self.engines.extend(add_engines(n, total=True))
87 if block:
88 self.wait_on_engines()
89
83 90
84 91 def wait_on_engines(self, timeout=5):
85 92 """wait for our engines to connect."""
86 93 n = len(self.engines)+self.base_engine_count
87 94 tic = time.time()
88 95 while time.time()-tic < timeout and len(self.client.ids) < n:
89 96 time.sleep(0.1)
90 97
91 98 assert not len(self.client.ids) < n, "waiting for engines timed out"
92 99
93 100 def connect_client(self):
94 101 """connect a client with my Context, and track its sockets for cleanup"""
95 102 c = Client(profile='iptest', context=self.context)
96 103 for name in filter(lambda n:n.endswith('socket'), dir(c)):
97 104 s = getattr(c, name)
98 105 s.setsockopt(zmq.LINGER, 0)
99 106 self.sockets.append(s)
100 107 return c
101 108
102 109 def assertRaisesRemote(self, etype, f, *args, **kwargs):
103 110 try:
104 111 try:
105 112 f(*args, **kwargs)
106 113 except error.CompositeError as e:
107 114 e.raise_exception()
108 115 except error.RemoteError as e:
109 116 self.assertEquals(etype.__name__, e.ename, "Should have raised %r, but raised %r"%(etype.__name__, e.ename))
110 117 else:
111 118 self.fail("should have raised a RemoteError")
112 119
113 120 def setUp(self):
114 121 BaseZMQTestCase.setUp(self)
115 122 self.client = self.connect_client()
116 123 # start every test with clean engine namespaces:
117 124 self.client.clear(block=True)
118 125 self.base_engine_count=len(self.client.ids)
119 126 self.engines=[]
120 127
121 128 def tearDown(self):
122 129 # self.client.clear(block=True)
123 130 # close fds:
124 131 for e in filter(lambda e: e.poll() is not None, launchers):
125 132 launchers.remove(e)
126 133
127 134 # allow flushing of incoming messages to prevent crash on socket close
128 135 self.client.wait(timeout=2)
129 136 # time.sleep(2)
130 137 self.client.spin()
131 138 self.client.close()
132 139 BaseZMQTestCase.tearDown(self)
133 140 # this will be redundant when pyzmq merges PR #88
134 141 # self.context.term()
135 142 # print tempfile.TemporaryFile().fileno(),
136 143 # sys.stdout.flush()
137 144 No newline at end of file
@@ -1,115 +1,115 b''
1 1 """Tests for asyncresult.py
2 2
3 3 Authors:
4 4
5 5 * Min RK
6 6 """
7 7
8 8 #-------------------------------------------------------------------------------
9 9 # Copyright (C) 2011 The IPython Development Team
10 10 #
11 11 # Distributed under the terms of the BSD License. The full license is in
12 12 # the file COPYING, distributed as part of this software.
13 13 #-------------------------------------------------------------------------------
14 14
15 15 #-------------------------------------------------------------------------------
16 16 # Imports
17 17 #-------------------------------------------------------------------------------
18 18
19 19
20 20 from IPython.parallel.error import TimeoutError
21 21
22 22 from IPython.parallel.tests import add_engines
23 23 from .clienttest import ClusterTestCase
24 24
25 25 def setup():
26 add_engines(2)
26 add_engines(2, total=True)
27 27
28 28 def wait(n):
29 29 import time
30 30 time.sleep(n)
31 31 return n
32 32
33 33 class AsyncResultTest(ClusterTestCase):
34 34
35 35 def test_single_result(self):
36 36 eid = self.client.ids[-1]
37 37 ar = self.client[eid].apply_async(lambda : 42)
38 38 self.assertEquals(ar.get(), 42)
39 39 ar = self.client[[eid]].apply_async(lambda : 42)
40 40 self.assertEquals(ar.get(), [42])
41 41 ar = self.client[-1:].apply_async(lambda : 42)
42 42 self.assertEquals(ar.get(), [42])
43 43
44 44 def test_get_after_done(self):
45 45 ar = self.client[-1].apply_async(lambda : 42)
46 46 ar.wait()
47 47 self.assertTrue(ar.ready())
48 48 self.assertEquals(ar.get(), 42)
49 49 self.assertEquals(ar.get(), 42)
50 50
51 51 def test_get_before_done(self):
52 52 ar = self.client[-1].apply_async(wait, 0.1)
53 53 self.assertRaises(TimeoutError, ar.get, 0)
54 54 ar.wait(0)
55 55 self.assertFalse(ar.ready())
56 56 self.assertEquals(ar.get(), 0.1)
57 57
58 58 def test_get_after_error(self):
59 59 ar = self.client[-1].apply_async(lambda : 1/0)
60 60 ar.wait(10)
61 61 self.assertRaisesRemote(ZeroDivisionError, ar.get)
62 62 self.assertRaisesRemote(ZeroDivisionError, ar.get)
63 63 self.assertRaisesRemote(ZeroDivisionError, ar.get_dict)
64 64
65 65 def test_get_dict(self):
66 66 n = len(self.client)
67 67 ar = self.client[:].apply_async(lambda : 5)
68 68 self.assertEquals(ar.get(), [5]*n)
69 69 d = ar.get_dict()
70 70 self.assertEquals(sorted(d.keys()), sorted(self.client.ids))
71 71 for eid,r in d.iteritems():
72 72 self.assertEquals(r, 5)
73 73
74 74 def test_list_amr(self):
75 75 ar = self.client.load_balanced_view().map_async(wait, [0.1]*5)
76 76 rlist = list(ar)
77 77
78 78 def test_getattr(self):
79 79 ar = self.client[:].apply_async(wait, 0.5)
80 80 self.assertRaises(AttributeError, lambda : ar._foo)
81 81 self.assertRaises(AttributeError, lambda : ar.__length_hint__())
82 82 self.assertRaises(AttributeError, lambda : ar.foo)
83 83 self.assertRaises(AttributeError, lambda : ar.engine_id)
84 84 self.assertFalse(hasattr(ar, '__length_hint__'))
85 85 self.assertFalse(hasattr(ar, 'foo'))
86 86 self.assertFalse(hasattr(ar, 'engine_id'))
87 87 ar.get(5)
88 88 self.assertRaises(AttributeError, lambda : ar._foo)
89 89 self.assertRaises(AttributeError, lambda : ar.__length_hint__())
90 90 self.assertRaises(AttributeError, lambda : ar.foo)
91 91 self.assertTrue(isinstance(ar.engine_id, list))
92 92 self.assertEquals(ar.engine_id, ar['engine_id'])
93 93 self.assertFalse(hasattr(ar, '__length_hint__'))
94 94 self.assertFalse(hasattr(ar, 'foo'))
95 95 self.assertTrue(hasattr(ar, 'engine_id'))
96 96
97 97 def test_getitem(self):
98 98 ar = self.client[:].apply_async(wait, 0.5)
99 99 self.assertRaises(TimeoutError, lambda : ar['foo'])
100 100 self.assertRaises(TimeoutError, lambda : ar['engine_id'])
101 101 ar.get(5)
102 102 self.assertRaises(KeyError, lambda : ar['foo'])
103 103 self.assertTrue(isinstance(ar['engine_id'], list))
104 104 self.assertEquals(ar.engine_id, ar['engine_id'])
105 105
106 106 def test_single_result(self):
107 107 ar = self.client[-1].apply_async(wait, 0.5)
108 108 self.assertRaises(TimeoutError, lambda : ar['foo'])
109 109 self.assertRaises(TimeoutError, lambda : ar['engine_id'])
110 110 self.assertTrue(ar.get(5) == 0.5)
111 111 self.assertTrue(isinstance(ar['engine_id'], int))
112 112 self.assertTrue(isinstance(ar.engine_id, int))
113 113 self.assertEquals(ar.engine_id, ar['engine_id'])
114 114
115 115
@@ -1,324 +1,319 b''
1 1 """Tests for parallel client.py
2 2
3 3 Authors:
4 4
5 5 * Min RK
6 6 """
7 7
8 8 #-------------------------------------------------------------------------------
9 9 # Copyright (C) 2011 The IPython Development Team
10 10 #
11 11 # Distributed under the terms of the BSD License. The full license is in
12 12 # the file COPYING, distributed as part of this software.
13 13 #-------------------------------------------------------------------------------
14 14
15 15 #-------------------------------------------------------------------------------
16 16 # Imports
17 17 #-------------------------------------------------------------------------------
18 18
19 19 from __future__ import division
20 20
21 21 import time
22 22 from datetime import datetime
23 23 from tempfile import mktemp
24 24
25 25 import zmq
26 26
27 27 from IPython.parallel.client import client as clientmod
28 28 from IPython.parallel import error
29 29 from IPython.parallel import AsyncResult, AsyncHubResult
30 30 from IPython.parallel import LoadBalancedView, DirectView
31 31
32 32 from clienttest import ClusterTestCase, segfault, wait, add_engines
33 33
34 34 def setup():
35 add_engines(4)
35 add_engines(4, total=True)
36 36
37 37 class TestClient(ClusterTestCase):
38 38
39 39 def test_ids(self):
40 40 n = len(self.client.ids)
41 self.add_engines(3)
42 self.assertEquals(len(self.client.ids), n+3)
41 self.add_engines(2)
42 self.assertEquals(len(self.client.ids), n+2)
43 43
44 44 def test_view_indexing(self):
45 45 """test index access for views"""
46 self.add_engines(2)
46 self.minimum_engines(4)
47 47 targets = self.client._build_targets('all')[-1]
48 48 v = self.client[:]
49 49 self.assertEquals(v.targets, targets)
50 50 t = self.client.ids[2]
51 51 v = self.client[t]
52 52 self.assert_(isinstance(v, DirectView))
53 53 self.assertEquals(v.targets, t)
54 54 t = self.client.ids[2:4]
55 55 v = self.client[t]
56 56 self.assert_(isinstance(v, DirectView))
57 57 self.assertEquals(v.targets, t)
58 58 v = self.client[::2]
59 59 self.assert_(isinstance(v, DirectView))
60 60 self.assertEquals(v.targets, targets[::2])
61 61 v = self.client[1::3]
62 62 self.assert_(isinstance(v, DirectView))
63 63 self.assertEquals(v.targets, targets[1::3])
64 64 v = self.client[:-3]
65 65 self.assert_(isinstance(v, DirectView))
66 66 self.assertEquals(v.targets, targets[:-3])
67 67 v = self.client[-1]
68 68 self.assert_(isinstance(v, DirectView))
69 69 self.assertEquals(v.targets, targets[-1])
70 70 self.assertRaises(TypeError, lambda : self.client[None])
71 71
72 72 def test_lbview_targets(self):
73 73 """test load_balanced_view targets"""
74 74 v = self.client.load_balanced_view()
75 75 self.assertEquals(v.targets, None)
76 76 v = self.client.load_balanced_view(-1)
77 77 self.assertEquals(v.targets, [self.client.ids[-1]])
78 78 v = self.client.load_balanced_view('all')
79 79 self.assertEquals(v.targets, None)
80 80
81 81 def test_dview_targets(self):
82 82 """test direct_view targets"""
83 83 v = self.client.direct_view()
84 84 self.assertEquals(v.targets, 'all')
85 85 v = self.client.direct_view('all')
86 86 self.assertEquals(v.targets, 'all')
87 87 v = self.client.direct_view(-1)
88 88 self.assertEquals(v.targets, self.client.ids[-1])
89 89
90 90 def test_lazy_all_targets(self):
91 91 """test lazy evaluation of rc.direct_view('all')"""
92 92 v = self.client.direct_view()
93 93 self.assertEquals(v.targets, 'all')
94 94
95 95 def double(x):
96 96 return x*2
97 97 seq = range(100)
98 98 ref = [ double(x) for x in seq ]
99 99
100 100 # add some engines, which should be used
101 self.add_engines(2)
101 self.add_engines(1)
102 102 n1 = len(self.client.ids)
103 103
104 104 # simple apply
105 105 r = v.apply_sync(lambda : 1)
106 106 self.assertEquals(r, [1] * n1)
107 107
108 108 # map goes through remotefunction
109 109 r = v.map_sync(double, seq)
110 110 self.assertEquals(r, ref)
111 111
112 112 # add a couple more engines, and try again
113 113 self.add_engines(2)
114 114 n2 = len(self.client.ids)
115 115 self.assertNotEquals(n2, n1)
116 116
117 117 # apply
118 118 r = v.apply_sync(lambda : 1)
119 119 self.assertEquals(r, [1] * n2)
120 120
121 121 # map
122 122 r = v.map_sync(double, seq)
123 123 self.assertEquals(r, ref)
124 124
125 125 def test_targets(self):
126 126 """test various valid targets arguments"""
127 127 build = self.client._build_targets
128 128 ids = self.client.ids
129 129 idents,targets = build(None)
130 130 self.assertEquals(ids, targets)
131 131
132 132 def test_clear(self):
133 133 """test clear behavior"""
134 # self.add_engines(2)
134 self.minimum_engines(2)
135 135 v = self.client[:]
136 136 v.block=True
137 137 v.push(dict(a=5))
138 138 v.pull('a')
139 139 id0 = self.client.ids[-1]
140 140 self.client.clear(targets=id0, block=True)
141 141 a = self.client[:-1].get('a')
142 142 self.assertRaisesRemote(NameError, self.client[id0].get, 'a')
143 143 self.client.clear(block=True)
144 144 for i in self.client.ids:
145 # print i
146 145 self.assertRaisesRemote(NameError, self.client[i].get, 'a')
147 146
148 147 def test_get_result(self):
149 148 """test getting results from the Hub."""
150 149 c = clientmod.Client(profile='iptest')
151 # self.add_engines(1)
152 150 t = c.ids[-1]
153 151 ar = c[t].apply_async(wait, 1)
154 152 # give the monitor time to notice the message
155 153 time.sleep(.25)
156 154 ahr = self.client.get_result(ar.msg_ids)
157 155 self.assertTrue(isinstance(ahr, AsyncHubResult))
158 156 self.assertEquals(ahr.get(), ar.get())
159 157 ar2 = self.client.get_result(ar.msg_ids)
160 158 self.assertFalse(isinstance(ar2, AsyncHubResult))
161 159 c.close()
162 160
163 161 def test_ids_list(self):
164 162 """test client.ids"""
165 # self.add_engines(2)
166 163 ids = self.client.ids
167 164 self.assertEquals(ids, self.client._ids)
168 165 self.assertFalse(ids is self.client._ids)
169 166 ids.remove(ids[-1])
170 167 self.assertNotEquals(ids, self.client._ids)
171 168
172 169 def test_queue_status(self):
173 # self.addEngine(4)
174 170 ids = self.client.ids
175 171 id0 = ids[0]
176 172 qs = self.client.queue_status(targets=id0)
177 173 self.assertTrue(isinstance(qs, dict))
178 174 self.assertEquals(sorted(qs.keys()), ['completed', 'queue', 'tasks'])
179 175 allqs = self.client.queue_status()
180 176 self.assertTrue(isinstance(allqs, dict))
181 177 intkeys = list(allqs.keys())
182 178 intkeys.remove('unassigned')
183 179 self.assertEquals(sorted(intkeys), sorted(self.client.ids))
184 180 unassigned = allqs.pop('unassigned')
185 181 for eid,qs in allqs.items():
186 182 self.assertTrue(isinstance(qs, dict))
187 183 self.assertEquals(sorted(qs.keys()), ['completed', 'queue', 'tasks'])
188 184
189 185 def test_shutdown(self):
190 # self.addEngine(4)
191 186 ids = self.client.ids
192 187 id0 = ids[0]
193 188 self.client.shutdown(id0, block=True)
194 189 while id0 in self.client.ids:
195 190 time.sleep(0.1)
196 191 self.client.spin()
197 192
198 193 self.assertRaises(IndexError, lambda : self.client[id0])
199 194
200 195 def test_result_status(self):
201 196 pass
202 197 # to be written
203 198
204 199 def test_db_query_dt(self):
205 200 """test db query by date"""
206 201 hist = self.client.hub_history()
207 202 middle = self.client.db_query({'msg_id' : hist[len(hist)//2]})[0]
208 203 tic = middle['submitted']
209 204 before = self.client.db_query({'submitted' : {'$lt' : tic}})
210 205 after = self.client.db_query({'submitted' : {'$gte' : tic}})
211 206 self.assertEquals(len(before)+len(after),len(hist))
212 207 for b in before:
213 208 self.assertTrue(b['submitted'] < tic)
214 209 for a in after:
215 210 self.assertTrue(a['submitted'] >= tic)
216 211 same = self.client.db_query({'submitted' : tic})
217 212 for s in same:
218 213 self.assertTrue(s['submitted'] == tic)
219 214
220 215 def test_db_query_keys(self):
221 216 """test extracting subset of record keys"""
222 217 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted', 'completed'])
223 218 for rec in found:
224 219 self.assertEquals(set(rec.keys()), set(['msg_id', 'submitted', 'completed']))
225 220
226 221 def test_db_query_default_keys(self):
227 222 """default db_query excludes buffers"""
228 223 found = self.client.db_query({'msg_id': {'$ne' : ''}})
229 224 for rec in found:
230 225 keys = set(rec.keys())
231 226 self.assertFalse('buffers' in keys, "'buffers' should not be in: %s" % keys)
232 227 self.assertFalse('result_buffers' in keys, "'result_buffers' should not be in: %s" % keys)
233 228
234 229 def test_db_query_msg_id(self):
235 230 """ensure msg_id is always in db queries"""
236 231 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted', 'completed'])
237 232 for rec in found:
238 233 self.assertTrue('msg_id' in rec.keys())
239 234 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted'])
240 235 for rec in found:
241 236 self.assertTrue('msg_id' in rec.keys())
242 237 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['msg_id'])
243 238 for rec in found:
244 239 self.assertTrue('msg_id' in rec.keys())
245 240
246 241 def test_db_query_in(self):
247 242 """test db query with '$in','$nin' operators"""
248 243 hist = self.client.hub_history()
249 244 even = hist[::2]
250 245 odd = hist[1::2]
251 246 recs = self.client.db_query({ 'msg_id' : {'$in' : even}})
252 247 found = [ r['msg_id'] for r in recs ]
253 248 self.assertEquals(set(even), set(found))
254 249 recs = self.client.db_query({ 'msg_id' : {'$nin' : even}})
255 250 found = [ r['msg_id'] for r in recs ]
256 251 self.assertEquals(set(odd), set(found))
257 252
258 253 def test_hub_history(self):
259 254 hist = self.client.hub_history()
260 255 recs = self.client.db_query({ 'msg_id' : {"$ne":''}})
261 256 recdict = {}
262 257 for rec in recs:
263 258 recdict[rec['msg_id']] = rec
264 259
265 260 latest = datetime(1984,1,1)
266 261 for msg_id in hist:
267 262 rec = recdict[msg_id]
268 263 newt = rec['submitted']
269 264 self.assertTrue(newt >= latest)
270 265 latest = newt
271 266 ar = self.client[-1].apply_async(lambda : 1)
272 267 ar.get()
273 268 time.sleep(0.25)
274 269 self.assertEquals(self.client.hub_history()[-1:],ar.msg_ids)
275 270
276 271 def test_resubmit(self):
277 272 def f():
278 273 import random
279 274 return random.random()
280 275 v = self.client.load_balanced_view()
281 276 ar = v.apply_async(f)
282 277 r1 = ar.get(1)
283 278 # give the Hub a chance to notice:
284 279 time.sleep(0.5)
285 280 ahr = self.client.resubmit(ar.msg_ids)
286 281 r2 = ahr.get(1)
287 282 self.assertFalse(r1 == r2)
288 283
289 284 def test_resubmit_inflight(self):
290 285 """ensure ValueError on resubmit of inflight task"""
291 286 v = self.client.load_balanced_view()
292 287 ar = v.apply_async(time.sleep,1)
293 288 # give the message a chance to arrive
294 289 time.sleep(0.2)
295 290 self.assertRaisesRemote(ValueError, self.client.resubmit, ar.msg_ids)
296 291 ar.get(2)
297 292
298 293 def test_resubmit_badkey(self):
299 294 """ensure KeyError on resubmit of nonexistant task"""
300 295 self.assertRaisesRemote(KeyError, self.client.resubmit, ['invalid'])
301 296
302 297 def test_purge_results(self):
303 298 # ensure there are some tasks
304 299 for i in range(5):
305 300 self.client[:].apply_sync(lambda : 1)
306 301 # Wait for the Hub to realise the result is done:
307 302 # This prevents a race condition, where we
308 303 # might purge a result the Hub still thinks is pending.
309 304 time.sleep(0.1)
310 305 rc2 = clientmod.Client(profile='iptest')
311 306 hist = self.client.hub_history()
312 307 ahr = rc2.get_result([hist[-1]])
313 308 ahr.wait(10)
314 309 self.client.purge_results(hist[-1])
315 310 newhist = self.client.hub_history()
316 311 self.assertEquals(len(newhist)+1,len(hist))
317 312 rc2.spin()
318 313 rc2.close()
319 314
320 315 def test_purge_all_results(self):
321 316 self.client.purge_results('all')
322 317 hist = self.client.hub_history()
323 318 self.assertEquals(len(hist), 0)
324 319
@@ -1,106 +1,106 b''
1 1 """Tests for dependency.py
2 2
3 3 Authors:
4 4
5 5 * Min RK
6 6 """
7 7
8 8 __docformat__ = "restructuredtext en"
9 9
10 10 #-------------------------------------------------------------------------------
11 11 # Copyright (C) 2011 The IPython Development Team
12 12 #
13 13 # Distributed under the terms of the BSD License. The full license is in
14 14 # the file COPYING, distributed as part of this software.
15 15 #-------------------------------------------------------------------------------
16 16
17 17 #-------------------------------------------------------------------------------
18 18 # Imports
19 19 #-------------------------------------------------------------------------------
20 20
21 21 # import
22 22 import os
23 23
24 24 from IPython.utils.pickleutil import can, uncan
25 25
26 26 import IPython.parallel as pmod
27 27 from IPython.parallel.util import interactive
28 28
29 29 from IPython.parallel.tests import add_engines
30 30 from .clienttest import ClusterTestCase
31 31
32 32 def setup():
33 add_engines(1)
33 add_engines(1, total=True)
34 34
35 35 @pmod.require('time')
36 36 def wait(n):
37 37 time.sleep(n)
38 38 return n
39 39
40 40 mixed = map(str, range(10))
41 41 completed = map(str, range(0,10,2))
42 42 failed = map(str, range(1,10,2))
43 43
44 44 class DependencyTest(ClusterTestCase):
45 45
46 46 def setUp(self):
47 47 ClusterTestCase.setUp(self)
48 48 self.user_ns = {'__builtins__' : __builtins__}
49 49 self.view = self.client.load_balanced_view()
50 50 self.dview = self.client[-1]
51 51 self.succeeded = set(map(str, range(0,25,2)))
52 52 self.failed = set(map(str, range(1,25,2)))
53 53
54 54 def assertMet(self, dep):
55 55 self.assertTrue(dep.check(self.succeeded, self.failed), "Dependency should be met")
56 56
57 57 def assertUnmet(self, dep):
58 58 self.assertFalse(dep.check(self.succeeded, self.failed), "Dependency should not be met")
59 59
60 60 def assertUnreachable(self, dep):
61 61 self.assertTrue(dep.unreachable(self.succeeded, self.failed), "Dependency should be unreachable")
62 62
63 63 def assertReachable(self, dep):
64 64 self.assertFalse(dep.unreachable(self.succeeded, self.failed), "Dependency should be reachable")
65 65
66 66 def cancan(self, f):
67 67 """decorator to pass through canning into self.user_ns"""
68 68 return uncan(can(f), self.user_ns)
69 69
70 70 def test_require_imports(self):
71 71 """test that @require imports names"""
72 72 @self.cancan
73 73 @pmod.require('urllib')
74 74 @interactive
75 75 def encode(dikt):
76 76 return urllib.urlencode(dikt)
77 77 # must pass through canning to properly connect namespaces
78 78 self.assertEquals(encode(dict(a=5)), 'a=5')
79 79
80 80 def test_success_only(self):
81 81 dep = pmod.Dependency(mixed, success=True, failure=False)
82 82 self.assertUnmet(dep)
83 83 self.assertUnreachable(dep)
84 84 dep.all=False
85 85 self.assertMet(dep)
86 86 self.assertReachable(dep)
87 87 dep = pmod.Dependency(completed, success=True, failure=False)
88 88 self.assertMet(dep)
89 89 self.assertReachable(dep)
90 90 dep.all=False
91 91 self.assertMet(dep)
92 92 self.assertReachable(dep)
93 93
94 94 def test_failure_only(self):
95 95 dep = pmod.Dependency(mixed, success=False, failure=True)
96 96 self.assertUnmet(dep)
97 97 self.assertUnreachable(dep)
98 98 dep.all=False
99 99 self.assertMet(dep)
100 100 self.assertReachable(dep)
101 101 dep = pmod.Dependency(completed, success=False, failure=True)
102 102 self.assertUnmet(dep)
103 103 self.assertUnreachable(dep)
104 104 dep.all=False
105 105 self.assertUnmet(dep)
106 106 self.assertUnreachable(dep)
@@ -1,178 +1,176 b''
1 1 # -*- coding: utf-8 -*-
2 2 """test LoadBalancedView objects
3 3
4 4 Authors:
5 5
6 6 * Min RK
7 7 """
8 8 #-------------------------------------------------------------------------------
9 9 # Copyright (C) 2011 The IPython Development Team
10 10 #
11 11 # Distributed under the terms of the BSD License. The full license is in
12 12 # the file COPYING, distributed as part of this software.
13 13 #-------------------------------------------------------------------------------
14 14
15 15 #-------------------------------------------------------------------------------
16 16 # Imports
17 17 #-------------------------------------------------------------------------------
18 18
19 19 import sys
20 20 import time
21 21
22 22 import zmq
23 23 from nose import SkipTest
24 24
25 25 from IPython import parallel as pmod
26 26 from IPython.parallel import error
27 27
28 28 from IPython.parallel.tests import add_engines
29 29
30 30 from .clienttest import ClusterTestCase, crash, wait, skip_without
31 31
32 32 def setup():
33 add_engines(3)
33 add_engines(3, total=True)
34 34
35 35 class TestLoadBalancedView(ClusterTestCase):
36 36
37 37 def setUp(self):
38 38 ClusterTestCase.setUp(self)
39 39 self.view = self.client.load_balanced_view()
40 40
41 41 def test_z_crash_task(self):
42 42 """test graceful handling of engine death (balanced)"""
43 43 raise SkipTest("crash tests disabled, due to undesirable crash reports")
44 44 # self.add_engines(1)
45 45 ar = self.view.apply_async(crash)
46 46 self.assertRaisesRemote(error.EngineError, ar.get, 10)
47 47 eid = ar.engine_id
48 48 tic = time.time()
49 49 while eid in self.client.ids and time.time()-tic < 5:
50 50 time.sleep(.01)
51 51 self.client.spin()
52 52 self.assertFalse(eid in self.client.ids, "Engine should have died")
53 53
54 54 def test_map(self):
55 55 def f(x):
56 56 return x**2
57 57 data = range(16)
58 58 r = self.view.map_sync(f, data)
59 59 self.assertEquals(r, map(f, data))
60 60
61 61 def test_map_unordered(self):
62 62 def f(x):
63 63 return x**2
64 64 def slow_f(x):
65 65 import time
66 66 time.sleep(0.05*x)
67 67 return x**2
68 68 data = range(16,0,-1)
69 69 reference = map(f, data)
70 70
71 71 amr = self.view.map_async(slow_f, data, ordered=False)
72 72 self.assertTrue(isinstance(amr, pmod.AsyncMapResult))
73 73 # check individual elements, retrieved as they come
74 74 # list comprehension uses __iter__
75 75 astheycame = [ r for r in amr ]
76 76 # Ensure that at least one result came out of order:
77 77 self.assertNotEquals(astheycame, reference, "should not have preserved order")
78 78 self.assertEquals(sorted(astheycame, reverse=True), reference, "result corrupted")
79 79
80 80 def test_map_ordered(self):
81 81 def f(x):
82 82 return x**2
83 83 def slow_f(x):
84 84 import time
85 85 time.sleep(0.05*x)
86 86 return x**2
87 87 data = range(16,0,-1)
88 88 reference = map(f, data)
89 89
90 90 amr = self.view.map_async(slow_f, data)
91 91 self.assertTrue(isinstance(amr, pmod.AsyncMapResult))
92 92 # check individual elements, retrieved as they come
93 93 # list(amr) uses __iter__
94 94 astheycame = list(amr)
95 95 # Ensure that results came in order
96 96 self.assertEquals(astheycame, reference)
97 97 self.assertEquals(amr.result, reference)
98 98
99 99 def test_map_iterable(self):
100 100 """test map on iterables (balanced)"""
101 101 view = self.view
102 102 # 101 is prime, so it won't be evenly distributed
103 103 arr = range(101)
104 104 # so that it will be an iterator, even in Python 3
105 105 it = iter(arr)
106 106 r = view.map_sync(lambda x:x, arr)
107 107 self.assertEquals(r, list(arr))
108 108
109 109
110 110 def test_abort(self):
111 111 view = self.view
112 112 ar = self.client[:].apply_async(time.sleep, .5)
113 113 ar = self.client[:].apply_async(time.sleep, .5)
114 114 time.sleep(0.2)
115 115 ar2 = view.apply_async(lambda : 2)
116 116 ar3 = view.apply_async(lambda : 3)
117 117 view.abort(ar2)
118 118 view.abort(ar3.msg_ids)
119 119 self.assertRaises(error.TaskAborted, ar2.get)
120 120 self.assertRaises(error.TaskAborted, ar3.get)
121 121
122 122 def test_retries(self):
123 add_engines(3)
124 123 view = self.view
125 124 view.timeout = 1 # prevent hang if this doesn't behave
126 125 def fail():
127 126 assert False
128 127 for r in range(len(self.client)-1):
129 128 with view.temp_flags(retries=r):
130 129 self.assertRaisesRemote(AssertionError, view.apply_sync, fail)
131 130
132 131 with view.temp_flags(retries=len(self.client), timeout=0.25):
133 132 self.assertRaisesRemote(error.TaskTimeout, view.apply_sync, fail)
134 133
135 134 def test_invalid_dependency(self):
136 135 view = self.view
137 136 with view.temp_flags(after='12345'):
138 137 self.assertRaisesRemote(error.InvalidDependency, view.apply_sync, lambda : 1)
139 138
140 139 def test_impossible_dependency(self):
141 if len(self.client) < 2:
142 add_engines(2)
140 self.minimum_engines(2)
143 141 view = self.client.load_balanced_view()
144 142 ar1 = view.apply_async(lambda : 1)
145 143 ar1.get()
146 144 e1 = ar1.engine_id
147 145 e2 = e1
148 146 while e2 == e1:
149 147 ar2 = view.apply_async(lambda : 1)
150 148 ar2.get()
151 149 e2 = ar2.engine_id
152 150
153 151 with view.temp_flags(follow=[ar1, ar2]):
154 152 self.assertRaisesRemote(error.ImpossibleDependency, view.apply_sync, lambda : 1)
155 153
156 154
157 155 def test_follow(self):
158 156 ar = self.view.apply_async(lambda : 1)
159 157 ar.get()
160 158 ars = []
161 159 first_id = ar.engine_id
162 160
163 161 self.view.follow = ar
164 162 for i in range(5):
165 163 ars.append(self.view.apply_async(lambda : 1))
166 164 self.view.wait(ars)
167 165 for ar in ars:
168 166 self.assertEquals(ar.engine_id, first_id)
169 167
170 168 def test_after(self):
171 169 view = self.view
172 170 ar = view.apply_async(time.sleep, 0.5)
173 171 with view.temp_flags(after=ar):
174 172 ar2 = view.apply_async(lambda : 1)
175 173
176 174 ar.wait()
177 175 ar2.wait()
178 176 self.assertTrue(ar2.started >= ar.completed, "%s not >= %s"%(ar.started, ar.completed))
@@ -1,507 +1,507 b''
1 1 # -*- coding: utf-8 -*-
2 2 """test View objects
3 3
4 4 Authors:
5 5
6 6 * Min RK
7 7 """
8 8 #-------------------------------------------------------------------------------
9 9 # Copyright (C) 2011 The IPython Development Team
10 10 #
11 11 # Distributed under the terms of the BSD License. The full license is in
12 12 # the file COPYING, distributed as part of this software.
13 13 #-------------------------------------------------------------------------------
14 14
15 15 #-------------------------------------------------------------------------------
16 16 # Imports
17 17 #-------------------------------------------------------------------------------
18 18
19 19 import sys
20 20 import time
21 21 from tempfile import mktemp
22 22 from StringIO import StringIO
23 23
24 24 import zmq
25 25 from nose import SkipTest
26 26
27 27 from IPython.testing import decorators as dec
28 28
29 29 from IPython import parallel as pmod
30 30 from IPython.parallel import error
31 31 from IPython.parallel import AsyncResult, AsyncHubResult, AsyncMapResult
32 32 from IPython.parallel import DirectView
33 33 from IPython.parallel.util import interactive
34 34
35 35 from IPython.parallel.tests import add_engines
36 36
37 37 from .clienttest import ClusterTestCase, crash, wait, skip_without
38 38
39 39 def setup():
40 add_engines(3)
40 add_engines(3, total=True)
41 41
42 42 class TestView(ClusterTestCase):
43 43
44 44 def test_z_crash_mux(self):
45 45 """test graceful handling of engine death (direct)"""
46 46 raise SkipTest("crash tests disabled, due to undesirable crash reports")
47 47 # self.add_engines(1)
48 48 eid = self.client.ids[-1]
49 49 ar = self.client[eid].apply_async(crash)
50 50 self.assertRaisesRemote(error.EngineError, ar.get, 10)
51 51 eid = ar.engine_id
52 52 tic = time.time()
53 53 while eid in self.client.ids and time.time()-tic < 5:
54 54 time.sleep(.01)
55 55 self.client.spin()
56 56 self.assertFalse(eid in self.client.ids, "Engine should have died")
57 57
58 58 def test_push_pull(self):
59 59 """test pushing and pulling"""
60 60 data = dict(a=10, b=1.05, c=range(10), d={'e':(1,2),'f':'hi'})
61 61 t = self.client.ids[-1]
62 62 v = self.client[t]
63 63 push = v.push
64 64 pull = v.pull
65 65 v.block=True
66 66 nengines = len(self.client)
67 67 push({'data':data})
68 68 d = pull('data')
69 69 self.assertEquals(d, data)
70 70 self.client[:].push({'data':data})
71 71 d = self.client[:].pull('data', block=True)
72 72 self.assertEquals(d, nengines*[data])
73 73 ar = push({'data':data}, block=False)
74 74 self.assertTrue(isinstance(ar, AsyncResult))
75 75 r = ar.get()
76 76 ar = self.client[:].pull('data', block=False)
77 77 self.assertTrue(isinstance(ar, AsyncResult))
78 78 r = ar.get()
79 79 self.assertEquals(r, nengines*[data])
80 80 self.client[:].push(dict(a=10,b=20))
81 81 r = self.client[:].pull(('a','b'), block=True)
82 82 self.assertEquals(r, nengines*[[10,20]])
83 83
84 84 def test_push_pull_function(self):
85 85 "test pushing and pulling functions"
86 86 def testf(x):
87 87 return 2.0*x
88 88
89 89 t = self.client.ids[-1]
90 90 v = self.client[t]
91 91 v.block=True
92 92 push = v.push
93 93 pull = v.pull
94 94 execute = v.execute
95 95 push({'testf':testf})
96 96 r = pull('testf')
97 97 self.assertEqual(r(1.0), testf(1.0))
98 98 execute('r = testf(10)')
99 99 r = pull('r')
100 100 self.assertEquals(r, testf(10))
101 101 ar = self.client[:].push({'testf':testf}, block=False)
102 102 ar.get()
103 103 ar = self.client[:].pull('testf', block=False)
104 104 rlist = ar.get()
105 105 for r in rlist:
106 106 self.assertEqual(r(1.0), testf(1.0))
107 107 execute("def g(x): return x*x")
108 108 r = pull(('testf','g'))
109 109 self.assertEquals((r[0](10),r[1](10)), (testf(10), 100))
110 110
111 111 def test_push_function_globals(self):
112 112 """test that pushed functions have access to globals"""
113 113 @interactive
114 114 def geta():
115 115 return a
116 116 # self.add_engines(1)
117 117 v = self.client[-1]
118 118 v.block=True
119 119 v['f'] = geta
120 120 self.assertRaisesRemote(NameError, v.execute, 'b=f()')
121 121 v.execute('a=5')
122 122 v.execute('b=f()')
123 123 self.assertEquals(v['b'], 5)
124 124
125 125 def test_push_function_defaults(self):
126 126 """test that pushed functions preserve default args"""
127 127 def echo(a=10):
128 128 return a
129 129 v = self.client[-1]
130 130 v.block=True
131 131 v['f'] = echo
132 132 v.execute('b=f()')
133 133 self.assertEquals(v['b'], 10)
134 134
135 135 def test_get_result(self):
136 136 """test getting results from the Hub."""
137 137 c = pmod.Client(profile='iptest')
138 138 # self.add_engines(1)
139 139 t = c.ids[-1]
140 140 v = c[t]
141 141 v2 = self.client[t]
142 142 ar = v.apply_async(wait, 1)
143 143 # give the monitor time to notice the message
144 144 time.sleep(.25)
145 145 ahr = v2.get_result(ar.msg_ids)
146 146 self.assertTrue(isinstance(ahr, AsyncHubResult))
147 147 self.assertEquals(ahr.get(), ar.get())
148 148 ar2 = v2.get_result(ar.msg_ids)
149 149 self.assertFalse(isinstance(ar2, AsyncHubResult))
150 150 c.spin()
151 151 c.close()
152 152
153 153 def test_run_newline(self):
154 154 """test that run appends newline to files"""
155 155 tmpfile = mktemp()
156 156 with open(tmpfile, 'w') as f:
157 157 f.write("""def g():
158 158 return 5
159 159 """)
160 160 v = self.client[-1]
161 161 v.run(tmpfile, block=True)
162 162 self.assertEquals(v.apply_sync(lambda f: f(), pmod.Reference('g')), 5)
163 163
164 164 def test_apply_tracked(self):
165 165 """test tracking for apply"""
166 166 # self.add_engines(1)
167 167 t = self.client.ids[-1]
168 168 v = self.client[t]
169 169 v.block=False
170 170 def echo(n=1024*1024, **kwargs):
171 171 with v.temp_flags(**kwargs):
172 172 return v.apply(lambda x: x, 'x'*n)
173 173 ar = echo(1, track=False)
174 174 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
175 175 self.assertTrue(ar.sent)
176 176 ar = echo(track=True)
177 177 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
178 178 self.assertEquals(ar.sent, ar._tracker.done)
179 179 ar._tracker.wait()
180 180 self.assertTrue(ar.sent)
181 181
182 182 def test_push_tracked(self):
183 183 t = self.client.ids[-1]
184 184 ns = dict(x='x'*1024*1024)
185 185 v = self.client[t]
186 186 ar = v.push(ns, block=False, track=False)
187 187 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
188 188 self.assertTrue(ar.sent)
189 189
190 190 ar = v.push(ns, block=False, track=True)
191 191 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
192 192 ar._tracker.wait()
193 193 self.assertEquals(ar.sent, ar._tracker.done)
194 194 self.assertTrue(ar.sent)
195 195 ar.get()
196 196
197 197 def test_scatter_tracked(self):
198 198 t = self.client.ids
199 199 x='x'*1024*1024
200 200 ar = self.client[t].scatter('x', x, block=False, track=False)
201 201 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
202 202 self.assertTrue(ar.sent)
203 203
204 204 ar = self.client[t].scatter('x', x, block=False, track=True)
205 205 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
206 206 self.assertEquals(ar.sent, ar._tracker.done)
207 207 ar._tracker.wait()
208 208 self.assertTrue(ar.sent)
209 209 ar.get()
210 210
211 211 def test_remote_reference(self):
212 212 v = self.client[-1]
213 213 v['a'] = 123
214 214 ra = pmod.Reference('a')
215 215 b = v.apply_sync(lambda x: x, ra)
216 216 self.assertEquals(b, 123)
217 217
218 218
219 219 def test_scatter_gather(self):
220 220 view = self.client[:]
221 221 seq1 = range(16)
222 222 view.scatter('a', seq1)
223 223 seq2 = view.gather('a', block=True)
224 224 self.assertEquals(seq2, seq1)
225 225 self.assertRaisesRemote(NameError, view.gather, 'asdf', block=True)
226 226
227 227 @skip_without('numpy')
228 228 def test_scatter_gather_numpy(self):
229 229 import numpy
230 230 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
231 231 view = self.client[:]
232 232 a = numpy.arange(64)
233 233 view.scatter('a', a)
234 234 b = view.gather('a', block=True)
235 235 assert_array_equal(b, a)
236 236
237 237 def test_map(self):
238 238 view = self.client[:]
239 239 def f(x):
240 240 return x**2
241 241 data = range(16)
242 242 r = view.map_sync(f, data)
243 243 self.assertEquals(r, map(f, data))
244 244
245 245 def test_map_iterable(self):
246 246 """test map on iterables (direct)"""
247 247 view = self.client[:]
248 248 # 101 is prime, so it won't be evenly distributed
249 249 arr = range(101)
250 250 # ensure it will be an iterator, even in Python 3
251 251 it = iter(arr)
252 252 r = view.map_sync(lambda x:x, arr)
253 253 self.assertEquals(r, list(arr))
254 254
255 255 def test_scatterGatherNonblocking(self):
256 256 data = range(16)
257 257 view = self.client[:]
258 258 view.scatter('a', data, block=False)
259 259 ar = view.gather('a', block=False)
260 260 self.assertEquals(ar.get(), data)
261 261
262 262 @skip_without('numpy')
263 263 def test_scatter_gather_numpy_nonblocking(self):
264 264 import numpy
265 265 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
266 266 a = numpy.arange(64)
267 267 view = self.client[:]
268 268 ar = view.scatter('a', a, block=False)
269 269 self.assertTrue(isinstance(ar, AsyncResult))
270 270 amr = view.gather('a', block=False)
271 271 self.assertTrue(isinstance(amr, AsyncMapResult))
272 272 assert_array_equal(amr.get(), a)
273 273
274 274 def test_execute(self):
275 275 view = self.client[:]
276 276 # self.client.debug=True
277 277 execute = view.execute
278 278 ar = execute('c=30', block=False)
279 279 self.assertTrue(isinstance(ar, AsyncResult))
280 280 ar = execute('d=[0,1,2]', block=False)
281 281 self.client.wait(ar, 1)
282 282 self.assertEquals(len(ar.get()), len(self.client))
283 283 for c in view['c']:
284 284 self.assertEquals(c, 30)
285 285
286 286 def test_abort(self):
287 287 view = self.client[-1]
288 288 ar = view.execute('import time; time.sleep(1)', block=False)
289 289 ar2 = view.apply_async(lambda : 2)
290 290 ar3 = view.apply_async(lambda : 3)
291 291 view.abort(ar2)
292 292 view.abort(ar3.msg_ids)
293 293 self.assertRaises(error.TaskAborted, ar2.get)
294 294 self.assertRaises(error.TaskAborted, ar3.get)
295 295
296 296 def test_abort_all(self):
297 297 """view.abort() aborts all outstanding tasks"""
298 298 view = self.client[-1]
299 ars = [ view.apply_async(time.sleep, 1) for i in range(10) ]
299 ars = [ view.apply_async(time.sleep, 0.25) for i in range(10) ]
300 300 view.abort()
301 301 view.wait(timeout=5)
302 302 for ar in ars[5:]:
303 303 self.assertRaises(error.TaskAborted, ar.get)
304 304
305 305 def test_temp_flags(self):
306 306 view = self.client[-1]
307 307 view.block=True
308 308 with view.temp_flags(block=False):
309 309 self.assertFalse(view.block)
310 310 self.assertTrue(view.block)
311 311
312 312 @dec.known_failure_py3
313 313 def test_importer(self):
314 314 view = self.client[-1]
315 315 view.clear(block=True)
316 316 with view.importer:
317 317 import re
318 318
319 319 @interactive
320 320 def findall(pat, s):
321 321 # this globals() step isn't necessary in real code
322 322 # only to prevent a closure in the test
323 323 re = globals()['re']
324 324 return re.findall(pat, s)
325 325
326 326 self.assertEquals(view.apply_sync(findall, '\w+', 'hello world'), 'hello world'.split())
327 327
328 328 # parallel magic tests
329 329
330 330 def test_magic_px_blocking(self):
331 331 ip = get_ipython()
332 332 v = self.client[-1]
333 333 v.activate()
334 334 v.block=True
335 335
336 336 ip.magic_px('a=5')
337 337 self.assertEquals(v['a'], 5)
338 338 ip.magic_px('a=10')
339 339 self.assertEquals(v['a'], 10)
340 340 sio = StringIO()
341 341 savestdout = sys.stdout
342 342 sys.stdout = sio
343 343 # just 'print a' worst ~99% of the time, but this ensures that
344 344 # the stdout message has arrived when the result is finished:
345 345 ip.magic_px('import sys,time;print (a); sys.stdout.flush();time.sleep(0.2)')
346 346 sys.stdout = savestdout
347 347 buf = sio.getvalue()
348 348 self.assertTrue('[stdout:' in buf, buf)
349 349 self.assertTrue(buf.rstrip().endswith('10'))
350 350 self.assertRaisesRemote(ZeroDivisionError, ip.magic_px, '1/0')
351 351
352 352 def test_magic_px_nonblocking(self):
353 353 ip = get_ipython()
354 354 v = self.client[-1]
355 355 v.activate()
356 356 v.block=False
357 357
358 358 ip.magic_px('a=5')
359 359 self.assertEquals(v['a'], 5)
360 360 ip.magic_px('a=10')
361 361 self.assertEquals(v['a'], 10)
362 362 sio = StringIO()
363 363 savestdout = sys.stdout
364 364 sys.stdout = sio
365 365 ip.magic_px('print a')
366 366 sys.stdout = savestdout
367 367 buf = sio.getvalue()
368 368 self.assertFalse('[stdout:%i]'%v.targets in buf)
369 369 ip.magic_px('1/0')
370 370 ar = v.get_result(-1)
371 371 self.assertRaisesRemote(ZeroDivisionError, ar.get)
372 372
373 373 def test_magic_autopx_blocking(self):
374 374 ip = get_ipython()
375 375 v = self.client[-1]
376 376 v.activate()
377 377 v.block=True
378 378
379 379 sio = StringIO()
380 380 savestdout = sys.stdout
381 381 sys.stdout = sio
382 382 ip.magic_autopx()
383 383 ip.run_cell('\n'.join(('a=5','b=10','c=0')))
384 384 ip.run_cell('print b')
385 385 ip.run_cell("b/c")
386 386 ip.run_code(compile('b*=2', '', 'single'))
387 387 ip.magic_autopx()
388 388 sys.stdout = savestdout
389 389 output = sio.getvalue().strip()
390 390 self.assertTrue(output.startswith('%autopx enabled'))
391 391 self.assertTrue(output.endswith('%autopx disabled'))
392 392 self.assertTrue('RemoteError: ZeroDivisionError' in output)
393 393 ar = v.get_result(-2)
394 394 self.assertEquals(v['a'], 5)
395 395 self.assertEquals(v['b'], 20)
396 396 self.assertRaisesRemote(ZeroDivisionError, ar.get)
397 397
398 398 def test_magic_autopx_nonblocking(self):
399 399 ip = get_ipython()
400 400 v = self.client[-1]
401 401 v.activate()
402 402 v.block=False
403 403
404 404 sio = StringIO()
405 405 savestdout = sys.stdout
406 406 sys.stdout = sio
407 407 ip.magic_autopx()
408 408 ip.run_cell('\n'.join(('a=5','b=10','c=0')))
409 409 ip.run_cell('print b')
410 410 ip.run_cell("b/c")
411 411 ip.run_code(compile('b*=2', '', 'single'))
412 412 ip.magic_autopx()
413 413 sys.stdout = savestdout
414 414 output = sio.getvalue().strip()
415 415 self.assertTrue(output.startswith('%autopx enabled'))
416 416 self.assertTrue(output.endswith('%autopx disabled'))
417 417 self.assertFalse('ZeroDivisionError' in output)
418 418 ar = v.get_result(-2)
419 419 self.assertEquals(v['a'], 5)
420 420 self.assertEquals(v['b'], 20)
421 421 self.assertRaisesRemote(ZeroDivisionError, ar.get)
422 422
423 423 def test_magic_result(self):
424 424 ip = get_ipython()
425 425 v = self.client[-1]
426 426 v.activate()
427 427 v['a'] = 111
428 428 ra = v['a']
429 429
430 430 ar = ip.magic_result()
431 431 self.assertEquals(ar.msg_ids, [v.history[-1]])
432 432 self.assertEquals(ar.get(), 111)
433 433 ar = ip.magic_result('-2')
434 434 self.assertEquals(ar.msg_ids, [v.history[-2]])
435 435
436 436 def test_unicode_execute(self):
437 437 """test executing unicode strings"""
438 438 v = self.client[-1]
439 439 v.block=True
440 440 if sys.version_info[0] >= 3:
441 441 code="a='é'"
442 442 else:
443 443 code=u"a=u'é'"
444 444 v.execute(code)
445 445 self.assertEquals(v['a'], u'é')
446 446
447 447 def test_unicode_apply_result(self):
448 448 """test unicode apply results"""
449 449 v = self.client[-1]
450 450 r = v.apply_sync(lambda : u'é')
451 451 self.assertEquals(r, u'é')
452 452
453 453 def test_unicode_apply_arg(self):
454 454 """test passing unicode arguments to apply"""
455 455 v = self.client[-1]
456 456
457 457 @interactive
458 458 def check_unicode(a, check):
459 459 assert isinstance(a, unicode), "%r is not unicode"%a
460 460 assert isinstance(check, bytes), "%r is not bytes"%check
461 461 assert a.encode('utf8') == check, "%s != %s"%(a,check)
462 462
463 463 for s in [ u'é', u'ßø®∫',u'asdf' ]:
464 464 try:
465 465 v.apply_sync(check_unicode, s, s.encode('utf8'))
466 466 except error.RemoteError as e:
467 467 if e.ename == 'AssertionError':
468 468 self.fail(e.evalue)
469 469 else:
470 470 raise e
471 471
472 472 def test_map_reference(self):
473 473 """view.map(<Reference>, *seqs) should work"""
474 474 v = self.client[:]
475 475 v.scatter('n', self.client.ids, flatten=True)
476 476 v.execute("f = lambda x,y: x*y")
477 477 rf = pmod.Reference('f')
478 478 nlist = list(range(10))
479 479 mlist = nlist[::-1]
480 480 expected = [ m*n for m,n in zip(mlist, nlist) ]
481 481 result = v.map_sync(rf, mlist, nlist)
482 482 self.assertEquals(result, expected)
483 483
484 484 def test_apply_reference(self):
485 485 """view.apply(<Reference>, *args) should work"""
486 486 v = self.client[:]
487 487 v.scatter('n', self.client.ids, flatten=True)
488 488 v.execute("f = lambda x: n*x")
489 489 rf = pmod.Reference('f')
490 490 result = v.apply_sync(rf, 5)
491 491 expected = [ 5*id for id in self.client.ids ]
492 492 self.assertEquals(result, expected)
493 493
494 494 def test_eval_reference(self):
495 495 v = self.client[self.client.ids[0]]
496 496 v['g'] = range(5)
497 497 rg = pmod.Reference('g[0]')
498 498 echo = lambda x:x
499 499 self.assertEquals(v.apply_sync(echo, rg), 0)
500 500
501 501 def test_reference_nameerror(self):
502 502 v = self.client[self.client.ids[0]]
503 503 r = pmod.Reference('elvis_has_left')
504 504 echo = lambda x:x
505 505 self.assertRaisesRemote(NameError, v.apply_sync, echo, r)
506 506
507 507
General Comments 0
You need to be logged in to leave comments. Login now