##// END OF EJS Templates
expedite IPython.parallel tests...
MinRK -
Show More
@@ -1,111 +1,120 b''
1 """toplevel setup/teardown for parallel tests."""
1 """toplevel setup/teardown for parallel tests."""
2
2
3 #-------------------------------------------------------------------------------
3 #-------------------------------------------------------------------------------
4 # Copyright (C) 2011 The IPython Development Team
4 # Copyright (C) 2011 The IPython Development Team
5 #
5 #
6 # Distributed under the terms of the BSD License. The full license is in
6 # Distributed under the terms of the BSD License. The full license is in
7 # the file COPYING, distributed as part of this software.
7 # the file COPYING, distributed as part of this software.
8 #-------------------------------------------------------------------------------
8 #-------------------------------------------------------------------------------
9
9
10 #-------------------------------------------------------------------------------
10 #-------------------------------------------------------------------------------
11 # Imports
11 # Imports
12 #-------------------------------------------------------------------------------
12 #-------------------------------------------------------------------------------
13
13
14 import os
14 import os
15 import tempfile
15 import tempfile
16 import time
16 import time
17 from subprocess import Popen
17 from subprocess import Popen
18
18
19 from IPython.utils.path import get_ipython_dir
19 from IPython.utils.path import get_ipython_dir
20 from IPython.parallel import Client
20 from IPython.parallel import Client
21 from IPython.parallel.apps.launcher import (LocalProcessLauncher,
21 from IPython.parallel.apps.launcher import (LocalProcessLauncher,
22 ipengine_cmd_argv,
22 ipengine_cmd_argv,
23 ipcontroller_cmd_argv,
23 ipcontroller_cmd_argv,
24 SIGKILL)
24 SIGKILL)
25
25
26 # globals
26 # globals
27 launchers = []
27 launchers = []
28 blackhole = open(os.devnull, 'w')
28 blackhole = open(os.devnull, 'w')
29
29
30 # Launcher class
30 # Launcher class
31 class TestProcessLauncher(LocalProcessLauncher):
31 class TestProcessLauncher(LocalProcessLauncher):
32 """subclass LocalProcessLauncher, to prevent extra sockets and threads being created on Windows"""
32 """subclass LocalProcessLauncher, to prevent extra sockets and threads being created on Windows"""
33 def start(self):
33 def start(self):
34 if self.state == 'before':
34 if self.state == 'before':
35 self.process = Popen(self.args,
35 self.process = Popen(self.args,
36 stdout=blackhole, stderr=blackhole,
36 stdout=blackhole, stderr=blackhole,
37 env=os.environ,
37 env=os.environ,
38 cwd=self.work_dir
38 cwd=self.work_dir
39 )
39 )
40 self.notify_start(self.process.pid)
40 self.notify_start(self.process.pid)
41 self.poll = self.process.poll
41 self.poll = self.process.poll
42 else:
42 else:
43 s = 'The process was already started and has state: %r' % self.state
43 s = 'The process was already started and has state: %r' % self.state
44 raise ProcessStateError(s)
44 raise ProcessStateError(s)
45
45
46 # nose setup/teardown
46 # nose setup/teardown
47
47
48 def setup():
48 def setup():
49 cluster_dir = os.path.join(get_ipython_dir(), 'profile_iptest')
49 cluster_dir = os.path.join(get_ipython_dir(), 'profile_iptest')
50 engine_json = os.path.join(cluster_dir, 'security', 'ipcontroller-engine.json')
50 engine_json = os.path.join(cluster_dir, 'security', 'ipcontroller-engine.json')
51 client_json = os.path.join(cluster_dir, 'security', 'ipcontroller-client.json')
51 client_json = os.path.join(cluster_dir, 'security', 'ipcontroller-client.json')
52 for json in (engine_json, client_json):
52 for json in (engine_json, client_json):
53 if os.path.exists(json):
53 if os.path.exists(json):
54 os.remove(json)
54 os.remove(json)
55
55
56 cp = TestProcessLauncher()
56 cp = TestProcessLauncher()
57 cp.cmd_and_args = ipcontroller_cmd_argv + \
57 cp.cmd_and_args = ipcontroller_cmd_argv + \
58 ['--profile=iptest', '--log-level=50', '--ping=250']
58 ['--profile=iptest', '--log-level=50', '--ping=250']
59 cp.start()
59 cp.start()
60 launchers.append(cp)
60 launchers.append(cp)
61 tic = time.time()
61 tic = time.time()
62 while not os.path.exists(engine_json) or not os.path.exists(client_json):
62 while not os.path.exists(engine_json) or not os.path.exists(client_json):
63 if cp.poll() is not None:
63 if cp.poll() is not None:
64 print cp.poll()
64 print cp.poll()
65 raise RuntimeError("The test controller failed to start.")
65 raise RuntimeError("The test controller failed to start.")
66 elif time.time()-tic > 10:
66 elif time.time()-tic > 10:
67 raise RuntimeError("Timeout waiting for the test controller to start.")
67 raise RuntimeError("Timeout waiting for the test controller to start.")
68 time.sleep(0.1)
68 time.sleep(0.1)
69 add_engines(1)
69 add_engines(1)
70
70
71 def add_engines(n=1, profile='iptest'):
71 def add_engines(n=1, profile='iptest', total=False):
72 """add a number of engines to a given profile.
73
74 If total is True, then already running engines are counted, and only
75 the additional engines necessary (if any) are started.
76 """
72 rc = Client(profile=profile)
77 rc = Client(profile=profile)
73 base = len(rc)
78 base = len(rc)
79
80 if total:
81 n = max(n - base, 0)
82
74 eps = []
83 eps = []
75 for i in range(n):
84 for i in range(n):
76 ep = TestProcessLauncher()
85 ep = TestProcessLauncher()
77 ep.cmd_and_args = ipengine_cmd_argv + ['--profile=%s'%profile, '--log-level=50']
86 ep.cmd_and_args = ipengine_cmd_argv + ['--profile=%s'%profile, '--log-level=50']
78 ep.start()
87 ep.start()
79 launchers.append(ep)
88 launchers.append(ep)
80 eps.append(ep)
89 eps.append(ep)
81 tic = time.time()
90 tic = time.time()
82 while len(rc) < base+n:
91 while len(rc) < base+n:
83 if any([ ep.poll() is not None for ep in eps ]):
92 if any([ ep.poll() is not None for ep in eps ]):
84 raise RuntimeError("A test engine failed to start.")
93 raise RuntimeError("A test engine failed to start.")
85 elif time.time()-tic > 10:
94 elif time.time()-tic > 10:
86 raise RuntimeError("Timeout waiting for engines to connect.")
95 raise RuntimeError("Timeout waiting for engines to connect.")
87 time.sleep(.1)
96 time.sleep(.1)
88 rc.spin()
97 rc.spin()
89 rc.close()
98 rc.close()
90 return eps
99 return eps
91
100
92 def teardown():
101 def teardown():
93 time.sleep(1)
102 time.sleep(1)
94 while launchers:
103 while launchers:
95 p = launchers.pop()
104 p = launchers.pop()
96 if p.poll() is None:
105 if p.poll() is None:
97 try:
106 try:
98 p.stop()
107 p.stop()
99 except Exception, e:
108 except Exception, e:
100 print e
109 print e
101 pass
110 pass
102 if p.poll() is None:
111 if p.poll() is None:
103 time.sleep(.25)
112 time.sleep(.25)
104 if p.poll() is None:
113 if p.poll() is None:
105 try:
114 try:
106 print 'cleaning up test process...'
115 print 'cleaning up test process...'
107 p.signal(SIGKILL)
116 p.signal(SIGKILL)
108 except:
117 except:
109 print "couldn't shutdown process: ", p
118 print "couldn't shutdown process: ", p
110 blackhole.close()
119 blackhole.close()
111
120
@@ -1,137 +1,144 b''
1 """base class for parallel client tests
1 """base class for parallel client tests
2
2
3 Authors:
3 Authors:
4
4
5 * Min RK
5 * Min RK
6 """
6 """
7
7
8 #-------------------------------------------------------------------------------
8 #-------------------------------------------------------------------------------
9 # Copyright (C) 2011 The IPython Development Team
9 # Copyright (C) 2011 The IPython Development Team
10 #
10 #
11 # Distributed under the terms of the BSD License. The full license is in
11 # Distributed under the terms of the BSD License. The full license is in
12 # the file COPYING, distributed as part of this software.
12 # the file COPYING, distributed as part of this software.
13 #-------------------------------------------------------------------------------
13 #-------------------------------------------------------------------------------
14
14
15 import sys
15 import sys
16 import tempfile
16 import tempfile
17 import time
17 import time
18
18
19 from nose import SkipTest
19 from nose import SkipTest
20
20
21 import zmq
21 import zmq
22 from zmq.tests import BaseZMQTestCase
22 from zmq.tests import BaseZMQTestCase
23
23
24 from IPython.external.decorator import decorator
24 from IPython.external.decorator import decorator
25
25
26 from IPython.parallel import error
26 from IPython.parallel import error
27 from IPython.parallel import Client
27 from IPython.parallel import Client
28
28
29 from IPython.parallel.tests import launchers, add_engines
29 from IPython.parallel.tests import launchers, add_engines
30
30
31 # simple tasks for use in apply tests
31 # simple tasks for use in apply tests
32
32
33 def segfault():
33 def segfault():
34 """this will segfault"""
34 """this will segfault"""
35 import ctypes
35 import ctypes
36 ctypes.memset(-1,0,1)
36 ctypes.memset(-1,0,1)
37
37
38 def crash():
38 def crash():
39 """from stdlib crashers in the test suite"""
39 """from stdlib crashers in the test suite"""
40 import types
40 import types
41 if sys.platform.startswith('win'):
41 if sys.platform.startswith('win'):
42 import ctypes
42 import ctypes
43 ctypes.windll.kernel32.SetErrorMode(0x0002);
43 ctypes.windll.kernel32.SetErrorMode(0x0002);
44 args = [ 0, 0, 0, 0, b'\x04\x71\x00\x00', (), (), (), '', '', 1, b'']
44 args = [ 0, 0, 0, 0, b'\x04\x71\x00\x00', (), (), (), '', '', 1, b'']
45 if sys.version_info[0] >= 3:
45 if sys.version_info[0] >= 3:
46 # Python3 adds 'kwonlyargcount' as the second argument to Code
46 # Python3 adds 'kwonlyargcount' as the second argument to Code
47 args.insert(1, 0)
47 args.insert(1, 0)
48
48
49 co = types.CodeType(*args)
49 co = types.CodeType(*args)
50 exec(co)
50 exec(co)
51
51
52 def wait(n):
52 def wait(n):
53 """sleep for a time"""
53 """sleep for a time"""
54 import time
54 import time
55 time.sleep(n)
55 time.sleep(n)
56 return n
56 return n
57
57
58 def raiser(eclass):
58 def raiser(eclass):
59 """raise an exception"""
59 """raise an exception"""
60 raise eclass()
60 raise eclass()
61
61
62 # test decorator for skipping tests when libraries are unavailable
62 # test decorator for skipping tests when libraries are unavailable
63 def skip_without(*names):
63 def skip_without(*names):
64 """skip a test if some names are not importable"""
64 """skip a test if some names are not importable"""
65 @decorator
65 @decorator
66 def skip_without_names(f, *args, **kwargs):
66 def skip_without_names(f, *args, **kwargs):
67 """decorator to skip tests in the absence of numpy."""
67 """decorator to skip tests in the absence of numpy."""
68 for name in names:
68 for name in names:
69 try:
69 try:
70 __import__(name)
70 __import__(name)
71 except ImportError:
71 except ImportError:
72 raise SkipTest
72 raise SkipTest
73 return f(*args, **kwargs)
73 return f(*args, **kwargs)
74 return skip_without_names
74 return skip_without_names
75
75
76 class ClusterTestCase(BaseZMQTestCase):
76 class ClusterTestCase(BaseZMQTestCase):
77
77
78 def add_engines(self, n=1, block=True):
78 def add_engines(self, n=1, block=True):
79 """add multiple engines to our cluster"""
79 """add multiple engines to our cluster"""
80 self.engines.extend(add_engines(n))
80 self.engines.extend(add_engines(n))
81 if block:
81 if block:
82 self.wait_on_engines()
82 self.wait_on_engines()
83
84 def minimum_engines(self, n=1, block=True):
85 """add engines until there are at least n connected"""
86 self.engines.extend(add_engines(n, total=True))
87 if block:
88 self.wait_on_engines()
89
83
90
84 def wait_on_engines(self, timeout=5):
91 def wait_on_engines(self, timeout=5):
85 """wait for our engines to connect."""
92 """wait for our engines to connect."""
86 n = len(self.engines)+self.base_engine_count
93 n = len(self.engines)+self.base_engine_count
87 tic = time.time()
94 tic = time.time()
88 while time.time()-tic < timeout and len(self.client.ids) < n:
95 while time.time()-tic < timeout and len(self.client.ids) < n:
89 time.sleep(0.1)
96 time.sleep(0.1)
90
97
91 assert not len(self.client.ids) < n, "waiting for engines timed out"
98 assert not len(self.client.ids) < n, "waiting for engines timed out"
92
99
93 def connect_client(self):
100 def connect_client(self):
94 """connect a client with my Context, and track its sockets for cleanup"""
101 """connect a client with my Context, and track its sockets for cleanup"""
95 c = Client(profile='iptest', context=self.context)
102 c = Client(profile='iptest', context=self.context)
96 for name in filter(lambda n:n.endswith('socket'), dir(c)):
103 for name in filter(lambda n:n.endswith('socket'), dir(c)):
97 s = getattr(c, name)
104 s = getattr(c, name)
98 s.setsockopt(zmq.LINGER, 0)
105 s.setsockopt(zmq.LINGER, 0)
99 self.sockets.append(s)
106 self.sockets.append(s)
100 return c
107 return c
101
108
102 def assertRaisesRemote(self, etype, f, *args, **kwargs):
109 def assertRaisesRemote(self, etype, f, *args, **kwargs):
103 try:
110 try:
104 try:
111 try:
105 f(*args, **kwargs)
112 f(*args, **kwargs)
106 except error.CompositeError as e:
113 except error.CompositeError as e:
107 e.raise_exception()
114 e.raise_exception()
108 except error.RemoteError as e:
115 except error.RemoteError as e:
109 self.assertEquals(etype.__name__, e.ename, "Should have raised %r, but raised %r"%(etype.__name__, e.ename))
116 self.assertEquals(etype.__name__, e.ename, "Should have raised %r, but raised %r"%(etype.__name__, e.ename))
110 else:
117 else:
111 self.fail("should have raised a RemoteError")
118 self.fail("should have raised a RemoteError")
112
119
113 def setUp(self):
120 def setUp(self):
114 BaseZMQTestCase.setUp(self)
121 BaseZMQTestCase.setUp(self)
115 self.client = self.connect_client()
122 self.client = self.connect_client()
116 # start every test with clean engine namespaces:
123 # start every test with clean engine namespaces:
117 self.client.clear(block=True)
124 self.client.clear(block=True)
118 self.base_engine_count=len(self.client.ids)
125 self.base_engine_count=len(self.client.ids)
119 self.engines=[]
126 self.engines=[]
120
127
121 def tearDown(self):
128 def tearDown(self):
122 # self.client.clear(block=True)
129 # self.client.clear(block=True)
123 # close fds:
130 # close fds:
124 for e in filter(lambda e: e.poll() is not None, launchers):
131 for e in filter(lambda e: e.poll() is not None, launchers):
125 launchers.remove(e)
132 launchers.remove(e)
126
133
127 # allow flushing of incoming messages to prevent crash on socket close
134 # allow flushing of incoming messages to prevent crash on socket close
128 self.client.wait(timeout=2)
135 self.client.wait(timeout=2)
129 # time.sleep(2)
136 # time.sleep(2)
130 self.client.spin()
137 self.client.spin()
131 self.client.close()
138 self.client.close()
132 BaseZMQTestCase.tearDown(self)
139 BaseZMQTestCase.tearDown(self)
133 # this will be redundant when pyzmq merges PR #88
140 # this will be redundant when pyzmq merges PR #88
134 # self.context.term()
141 # self.context.term()
135 # print tempfile.TemporaryFile().fileno(),
142 # print tempfile.TemporaryFile().fileno(),
136 # sys.stdout.flush()
143 # sys.stdout.flush()
137 No newline at end of file
144
@@ -1,115 +1,115 b''
1 """Tests for asyncresult.py
1 """Tests for asyncresult.py
2
2
3 Authors:
3 Authors:
4
4
5 * Min RK
5 * Min RK
6 """
6 """
7
7
8 #-------------------------------------------------------------------------------
8 #-------------------------------------------------------------------------------
9 # Copyright (C) 2011 The IPython Development Team
9 # Copyright (C) 2011 The IPython Development Team
10 #
10 #
11 # Distributed under the terms of the BSD License. The full license is in
11 # Distributed under the terms of the BSD License. The full license is in
12 # the file COPYING, distributed as part of this software.
12 # the file COPYING, distributed as part of this software.
13 #-------------------------------------------------------------------------------
13 #-------------------------------------------------------------------------------
14
14
15 #-------------------------------------------------------------------------------
15 #-------------------------------------------------------------------------------
16 # Imports
16 # Imports
17 #-------------------------------------------------------------------------------
17 #-------------------------------------------------------------------------------
18
18
19
19
20 from IPython.parallel.error import TimeoutError
20 from IPython.parallel.error import TimeoutError
21
21
22 from IPython.parallel.tests import add_engines
22 from IPython.parallel.tests import add_engines
23 from .clienttest import ClusterTestCase
23 from .clienttest import ClusterTestCase
24
24
25 def setup():
25 def setup():
26 add_engines(2)
26 add_engines(2, total=True)
27
27
28 def wait(n):
28 def wait(n):
29 import time
29 import time
30 time.sleep(n)
30 time.sleep(n)
31 return n
31 return n
32
32
33 class AsyncResultTest(ClusterTestCase):
33 class AsyncResultTest(ClusterTestCase):
34
34
35 def test_single_result(self):
35 def test_single_result(self):
36 eid = self.client.ids[-1]
36 eid = self.client.ids[-1]
37 ar = self.client[eid].apply_async(lambda : 42)
37 ar = self.client[eid].apply_async(lambda : 42)
38 self.assertEquals(ar.get(), 42)
38 self.assertEquals(ar.get(), 42)
39 ar = self.client[[eid]].apply_async(lambda : 42)
39 ar = self.client[[eid]].apply_async(lambda : 42)
40 self.assertEquals(ar.get(), [42])
40 self.assertEquals(ar.get(), [42])
41 ar = self.client[-1:].apply_async(lambda : 42)
41 ar = self.client[-1:].apply_async(lambda : 42)
42 self.assertEquals(ar.get(), [42])
42 self.assertEquals(ar.get(), [42])
43
43
44 def test_get_after_done(self):
44 def test_get_after_done(self):
45 ar = self.client[-1].apply_async(lambda : 42)
45 ar = self.client[-1].apply_async(lambda : 42)
46 ar.wait()
46 ar.wait()
47 self.assertTrue(ar.ready())
47 self.assertTrue(ar.ready())
48 self.assertEquals(ar.get(), 42)
48 self.assertEquals(ar.get(), 42)
49 self.assertEquals(ar.get(), 42)
49 self.assertEquals(ar.get(), 42)
50
50
51 def test_get_before_done(self):
51 def test_get_before_done(self):
52 ar = self.client[-1].apply_async(wait, 0.1)
52 ar = self.client[-1].apply_async(wait, 0.1)
53 self.assertRaises(TimeoutError, ar.get, 0)
53 self.assertRaises(TimeoutError, ar.get, 0)
54 ar.wait(0)
54 ar.wait(0)
55 self.assertFalse(ar.ready())
55 self.assertFalse(ar.ready())
56 self.assertEquals(ar.get(), 0.1)
56 self.assertEquals(ar.get(), 0.1)
57
57
58 def test_get_after_error(self):
58 def test_get_after_error(self):
59 ar = self.client[-1].apply_async(lambda : 1/0)
59 ar = self.client[-1].apply_async(lambda : 1/0)
60 ar.wait(10)
60 ar.wait(10)
61 self.assertRaisesRemote(ZeroDivisionError, ar.get)
61 self.assertRaisesRemote(ZeroDivisionError, ar.get)
62 self.assertRaisesRemote(ZeroDivisionError, ar.get)
62 self.assertRaisesRemote(ZeroDivisionError, ar.get)
63 self.assertRaisesRemote(ZeroDivisionError, ar.get_dict)
63 self.assertRaisesRemote(ZeroDivisionError, ar.get_dict)
64
64
65 def test_get_dict(self):
65 def test_get_dict(self):
66 n = len(self.client)
66 n = len(self.client)
67 ar = self.client[:].apply_async(lambda : 5)
67 ar = self.client[:].apply_async(lambda : 5)
68 self.assertEquals(ar.get(), [5]*n)
68 self.assertEquals(ar.get(), [5]*n)
69 d = ar.get_dict()
69 d = ar.get_dict()
70 self.assertEquals(sorted(d.keys()), sorted(self.client.ids))
70 self.assertEquals(sorted(d.keys()), sorted(self.client.ids))
71 for eid,r in d.iteritems():
71 for eid,r in d.iteritems():
72 self.assertEquals(r, 5)
72 self.assertEquals(r, 5)
73
73
74 def test_list_amr(self):
74 def test_list_amr(self):
75 ar = self.client.load_balanced_view().map_async(wait, [0.1]*5)
75 ar = self.client.load_balanced_view().map_async(wait, [0.1]*5)
76 rlist = list(ar)
76 rlist = list(ar)
77
77
78 def test_getattr(self):
78 def test_getattr(self):
79 ar = self.client[:].apply_async(wait, 0.5)
79 ar = self.client[:].apply_async(wait, 0.5)
80 self.assertRaises(AttributeError, lambda : ar._foo)
80 self.assertRaises(AttributeError, lambda : ar._foo)
81 self.assertRaises(AttributeError, lambda : ar.__length_hint__())
81 self.assertRaises(AttributeError, lambda : ar.__length_hint__())
82 self.assertRaises(AttributeError, lambda : ar.foo)
82 self.assertRaises(AttributeError, lambda : ar.foo)
83 self.assertRaises(AttributeError, lambda : ar.engine_id)
83 self.assertRaises(AttributeError, lambda : ar.engine_id)
84 self.assertFalse(hasattr(ar, '__length_hint__'))
84 self.assertFalse(hasattr(ar, '__length_hint__'))
85 self.assertFalse(hasattr(ar, 'foo'))
85 self.assertFalse(hasattr(ar, 'foo'))
86 self.assertFalse(hasattr(ar, 'engine_id'))
86 self.assertFalse(hasattr(ar, 'engine_id'))
87 ar.get(5)
87 ar.get(5)
88 self.assertRaises(AttributeError, lambda : ar._foo)
88 self.assertRaises(AttributeError, lambda : ar._foo)
89 self.assertRaises(AttributeError, lambda : ar.__length_hint__())
89 self.assertRaises(AttributeError, lambda : ar.__length_hint__())
90 self.assertRaises(AttributeError, lambda : ar.foo)
90 self.assertRaises(AttributeError, lambda : ar.foo)
91 self.assertTrue(isinstance(ar.engine_id, list))
91 self.assertTrue(isinstance(ar.engine_id, list))
92 self.assertEquals(ar.engine_id, ar['engine_id'])
92 self.assertEquals(ar.engine_id, ar['engine_id'])
93 self.assertFalse(hasattr(ar, '__length_hint__'))
93 self.assertFalse(hasattr(ar, '__length_hint__'))
94 self.assertFalse(hasattr(ar, 'foo'))
94 self.assertFalse(hasattr(ar, 'foo'))
95 self.assertTrue(hasattr(ar, 'engine_id'))
95 self.assertTrue(hasattr(ar, 'engine_id'))
96
96
97 def test_getitem(self):
97 def test_getitem(self):
98 ar = self.client[:].apply_async(wait, 0.5)
98 ar = self.client[:].apply_async(wait, 0.5)
99 self.assertRaises(TimeoutError, lambda : ar['foo'])
99 self.assertRaises(TimeoutError, lambda : ar['foo'])
100 self.assertRaises(TimeoutError, lambda : ar['engine_id'])
100 self.assertRaises(TimeoutError, lambda : ar['engine_id'])
101 ar.get(5)
101 ar.get(5)
102 self.assertRaises(KeyError, lambda : ar['foo'])
102 self.assertRaises(KeyError, lambda : ar['foo'])
103 self.assertTrue(isinstance(ar['engine_id'], list))
103 self.assertTrue(isinstance(ar['engine_id'], list))
104 self.assertEquals(ar.engine_id, ar['engine_id'])
104 self.assertEquals(ar.engine_id, ar['engine_id'])
105
105
106 def test_single_result(self):
106 def test_single_result(self):
107 ar = self.client[-1].apply_async(wait, 0.5)
107 ar = self.client[-1].apply_async(wait, 0.5)
108 self.assertRaises(TimeoutError, lambda : ar['foo'])
108 self.assertRaises(TimeoutError, lambda : ar['foo'])
109 self.assertRaises(TimeoutError, lambda : ar['engine_id'])
109 self.assertRaises(TimeoutError, lambda : ar['engine_id'])
110 self.assertTrue(ar.get(5) == 0.5)
110 self.assertTrue(ar.get(5) == 0.5)
111 self.assertTrue(isinstance(ar['engine_id'], int))
111 self.assertTrue(isinstance(ar['engine_id'], int))
112 self.assertTrue(isinstance(ar.engine_id, int))
112 self.assertTrue(isinstance(ar.engine_id, int))
113 self.assertEquals(ar.engine_id, ar['engine_id'])
113 self.assertEquals(ar.engine_id, ar['engine_id'])
114
114
115
115
@@ -1,324 +1,319 b''
1 """Tests for parallel client.py
1 """Tests for parallel client.py
2
2
3 Authors:
3 Authors:
4
4
5 * Min RK
5 * Min RK
6 """
6 """
7
7
8 #-------------------------------------------------------------------------------
8 #-------------------------------------------------------------------------------
9 # Copyright (C) 2011 The IPython Development Team
9 # Copyright (C) 2011 The IPython Development Team
10 #
10 #
11 # Distributed under the terms of the BSD License. The full license is in
11 # Distributed under the terms of the BSD License. The full license is in
12 # the file COPYING, distributed as part of this software.
12 # the file COPYING, distributed as part of this software.
13 #-------------------------------------------------------------------------------
13 #-------------------------------------------------------------------------------
14
14
15 #-------------------------------------------------------------------------------
15 #-------------------------------------------------------------------------------
16 # Imports
16 # Imports
17 #-------------------------------------------------------------------------------
17 #-------------------------------------------------------------------------------
18
18
19 from __future__ import division
19 from __future__ import division
20
20
21 import time
21 import time
22 from datetime import datetime
22 from datetime import datetime
23 from tempfile import mktemp
23 from tempfile import mktemp
24
24
25 import zmq
25 import zmq
26
26
27 from IPython.parallel.client import client as clientmod
27 from IPython.parallel.client import client as clientmod
28 from IPython.parallel import error
28 from IPython.parallel import error
29 from IPython.parallel import AsyncResult, AsyncHubResult
29 from IPython.parallel import AsyncResult, AsyncHubResult
30 from IPython.parallel import LoadBalancedView, DirectView
30 from IPython.parallel import LoadBalancedView, DirectView
31
31
32 from clienttest import ClusterTestCase, segfault, wait, add_engines
32 from clienttest import ClusterTestCase, segfault, wait, add_engines
33
33
34 def setup():
34 def setup():
35 add_engines(4)
35 add_engines(4, total=True)
36
36
37 class TestClient(ClusterTestCase):
37 class TestClient(ClusterTestCase):
38
38
39 def test_ids(self):
39 def test_ids(self):
40 n = len(self.client.ids)
40 n = len(self.client.ids)
41 self.add_engines(3)
41 self.add_engines(2)
42 self.assertEquals(len(self.client.ids), n+3)
42 self.assertEquals(len(self.client.ids), n+2)
43
43
44 def test_view_indexing(self):
44 def test_view_indexing(self):
45 """test index access for views"""
45 """test index access for views"""
46 self.add_engines(2)
46 self.minimum_engines(4)
47 targets = self.client._build_targets('all')[-1]
47 targets = self.client._build_targets('all')[-1]
48 v = self.client[:]
48 v = self.client[:]
49 self.assertEquals(v.targets, targets)
49 self.assertEquals(v.targets, targets)
50 t = self.client.ids[2]
50 t = self.client.ids[2]
51 v = self.client[t]
51 v = self.client[t]
52 self.assert_(isinstance(v, DirectView))
52 self.assert_(isinstance(v, DirectView))
53 self.assertEquals(v.targets, t)
53 self.assertEquals(v.targets, t)
54 t = self.client.ids[2:4]
54 t = self.client.ids[2:4]
55 v = self.client[t]
55 v = self.client[t]
56 self.assert_(isinstance(v, DirectView))
56 self.assert_(isinstance(v, DirectView))
57 self.assertEquals(v.targets, t)
57 self.assertEquals(v.targets, t)
58 v = self.client[::2]
58 v = self.client[::2]
59 self.assert_(isinstance(v, DirectView))
59 self.assert_(isinstance(v, DirectView))
60 self.assertEquals(v.targets, targets[::2])
60 self.assertEquals(v.targets, targets[::2])
61 v = self.client[1::3]
61 v = self.client[1::3]
62 self.assert_(isinstance(v, DirectView))
62 self.assert_(isinstance(v, DirectView))
63 self.assertEquals(v.targets, targets[1::3])
63 self.assertEquals(v.targets, targets[1::3])
64 v = self.client[:-3]
64 v = self.client[:-3]
65 self.assert_(isinstance(v, DirectView))
65 self.assert_(isinstance(v, DirectView))
66 self.assertEquals(v.targets, targets[:-3])
66 self.assertEquals(v.targets, targets[:-3])
67 v = self.client[-1]
67 v = self.client[-1]
68 self.assert_(isinstance(v, DirectView))
68 self.assert_(isinstance(v, DirectView))
69 self.assertEquals(v.targets, targets[-1])
69 self.assertEquals(v.targets, targets[-1])
70 self.assertRaises(TypeError, lambda : self.client[None])
70 self.assertRaises(TypeError, lambda : self.client[None])
71
71
72 def test_lbview_targets(self):
72 def test_lbview_targets(self):
73 """test load_balanced_view targets"""
73 """test load_balanced_view targets"""
74 v = self.client.load_balanced_view()
74 v = self.client.load_balanced_view()
75 self.assertEquals(v.targets, None)
75 self.assertEquals(v.targets, None)
76 v = self.client.load_balanced_view(-1)
76 v = self.client.load_balanced_view(-1)
77 self.assertEquals(v.targets, [self.client.ids[-1]])
77 self.assertEquals(v.targets, [self.client.ids[-1]])
78 v = self.client.load_balanced_view('all')
78 v = self.client.load_balanced_view('all')
79 self.assertEquals(v.targets, None)
79 self.assertEquals(v.targets, None)
80
80
81 def test_dview_targets(self):
81 def test_dview_targets(self):
82 """test direct_view targets"""
82 """test direct_view targets"""
83 v = self.client.direct_view()
83 v = self.client.direct_view()
84 self.assertEquals(v.targets, 'all')
84 self.assertEquals(v.targets, 'all')
85 v = self.client.direct_view('all')
85 v = self.client.direct_view('all')
86 self.assertEquals(v.targets, 'all')
86 self.assertEquals(v.targets, 'all')
87 v = self.client.direct_view(-1)
87 v = self.client.direct_view(-1)
88 self.assertEquals(v.targets, self.client.ids[-1])
88 self.assertEquals(v.targets, self.client.ids[-1])
89
89
90 def test_lazy_all_targets(self):
90 def test_lazy_all_targets(self):
91 """test lazy evaluation of rc.direct_view('all')"""
91 """test lazy evaluation of rc.direct_view('all')"""
92 v = self.client.direct_view()
92 v = self.client.direct_view()
93 self.assertEquals(v.targets, 'all')
93 self.assertEquals(v.targets, 'all')
94
94
95 def double(x):
95 def double(x):
96 return x*2
96 return x*2
97 seq = range(100)
97 seq = range(100)
98 ref = [ double(x) for x in seq ]
98 ref = [ double(x) for x in seq ]
99
99
100 # add some engines, which should be used
100 # add some engines, which should be used
101 self.add_engines(2)
101 self.add_engines(1)
102 n1 = len(self.client.ids)
102 n1 = len(self.client.ids)
103
103
104 # simple apply
104 # simple apply
105 r = v.apply_sync(lambda : 1)
105 r = v.apply_sync(lambda : 1)
106 self.assertEquals(r, [1] * n1)
106 self.assertEquals(r, [1] * n1)
107
107
108 # map goes through remotefunction
108 # map goes through remotefunction
109 r = v.map_sync(double, seq)
109 r = v.map_sync(double, seq)
110 self.assertEquals(r, ref)
110 self.assertEquals(r, ref)
111
111
112 # add a couple more engines, and try again
112 # add a couple more engines, and try again
113 self.add_engines(2)
113 self.add_engines(2)
114 n2 = len(self.client.ids)
114 n2 = len(self.client.ids)
115 self.assertNotEquals(n2, n1)
115 self.assertNotEquals(n2, n1)
116
116
117 # apply
117 # apply
118 r = v.apply_sync(lambda : 1)
118 r = v.apply_sync(lambda : 1)
119 self.assertEquals(r, [1] * n2)
119 self.assertEquals(r, [1] * n2)
120
120
121 # map
121 # map
122 r = v.map_sync(double, seq)
122 r = v.map_sync(double, seq)
123 self.assertEquals(r, ref)
123 self.assertEquals(r, ref)
124
124
125 def test_targets(self):
125 def test_targets(self):
126 """test various valid targets arguments"""
126 """test various valid targets arguments"""
127 build = self.client._build_targets
127 build = self.client._build_targets
128 ids = self.client.ids
128 ids = self.client.ids
129 idents,targets = build(None)
129 idents,targets = build(None)
130 self.assertEquals(ids, targets)
130 self.assertEquals(ids, targets)
131
131
132 def test_clear(self):
132 def test_clear(self):
133 """test clear behavior"""
133 """test clear behavior"""
134 # self.add_engines(2)
134 self.minimum_engines(2)
135 v = self.client[:]
135 v = self.client[:]
136 v.block=True
136 v.block=True
137 v.push(dict(a=5))
137 v.push(dict(a=5))
138 v.pull('a')
138 v.pull('a')
139 id0 = self.client.ids[-1]
139 id0 = self.client.ids[-1]
140 self.client.clear(targets=id0, block=True)
140 self.client.clear(targets=id0, block=True)
141 a = self.client[:-1].get('a')
141 a = self.client[:-1].get('a')
142 self.assertRaisesRemote(NameError, self.client[id0].get, 'a')
142 self.assertRaisesRemote(NameError, self.client[id0].get, 'a')
143 self.client.clear(block=True)
143 self.client.clear(block=True)
144 for i in self.client.ids:
144 for i in self.client.ids:
145 # print i
146 self.assertRaisesRemote(NameError, self.client[i].get, 'a')
145 self.assertRaisesRemote(NameError, self.client[i].get, 'a')
147
146
148 def test_get_result(self):
147 def test_get_result(self):
149 """test getting results from the Hub."""
148 """test getting results from the Hub."""
150 c = clientmod.Client(profile='iptest')
149 c = clientmod.Client(profile='iptest')
151 # self.add_engines(1)
152 t = c.ids[-1]
150 t = c.ids[-1]
153 ar = c[t].apply_async(wait, 1)
151 ar = c[t].apply_async(wait, 1)
154 # give the monitor time to notice the message
152 # give the monitor time to notice the message
155 time.sleep(.25)
153 time.sleep(.25)
156 ahr = self.client.get_result(ar.msg_ids)
154 ahr = self.client.get_result(ar.msg_ids)
157 self.assertTrue(isinstance(ahr, AsyncHubResult))
155 self.assertTrue(isinstance(ahr, AsyncHubResult))
158 self.assertEquals(ahr.get(), ar.get())
156 self.assertEquals(ahr.get(), ar.get())
159 ar2 = self.client.get_result(ar.msg_ids)
157 ar2 = self.client.get_result(ar.msg_ids)
160 self.assertFalse(isinstance(ar2, AsyncHubResult))
158 self.assertFalse(isinstance(ar2, AsyncHubResult))
161 c.close()
159 c.close()
162
160
163 def test_ids_list(self):
161 def test_ids_list(self):
164 """test client.ids"""
162 """test client.ids"""
165 # self.add_engines(2)
166 ids = self.client.ids
163 ids = self.client.ids
167 self.assertEquals(ids, self.client._ids)
164 self.assertEquals(ids, self.client._ids)
168 self.assertFalse(ids is self.client._ids)
165 self.assertFalse(ids is self.client._ids)
169 ids.remove(ids[-1])
166 ids.remove(ids[-1])
170 self.assertNotEquals(ids, self.client._ids)
167 self.assertNotEquals(ids, self.client._ids)
171
168
172 def test_queue_status(self):
169 def test_queue_status(self):
173 # self.addEngine(4)
174 ids = self.client.ids
170 ids = self.client.ids
175 id0 = ids[0]
171 id0 = ids[0]
176 qs = self.client.queue_status(targets=id0)
172 qs = self.client.queue_status(targets=id0)
177 self.assertTrue(isinstance(qs, dict))
173 self.assertTrue(isinstance(qs, dict))
178 self.assertEquals(sorted(qs.keys()), ['completed', 'queue', 'tasks'])
174 self.assertEquals(sorted(qs.keys()), ['completed', 'queue', 'tasks'])
179 allqs = self.client.queue_status()
175 allqs = self.client.queue_status()
180 self.assertTrue(isinstance(allqs, dict))
176 self.assertTrue(isinstance(allqs, dict))
181 intkeys = list(allqs.keys())
177 intkeys = list(allqs.keys())
182 intkeys.remove('unassigned')
178 intkeys.remove('unassigned')
183 self.assertEquals(sorted(intkeys), sorted(self.client.ids))
179 self.assertEquals(sorted(intkeys), sorted(self.client.ids))
184 unassigned = allqs.pop('unassigned')
180 unassigned = allqs.pop('unassigned')
185 for eid,qs in allqs.items():
181 for eid,qs in allqs.items():
186 self.assertTrue(isinstance(qs, dict))
182 self.assertTrue(isinstance(qs, dict))
187 self.assertEquals(sorted(qs.keys()), ['completed', 'queue', 'tasks'])
183 self.assertEquals(sorted(qs.keys()), ['completed', 'queue', 'tasks'])
188
184
189 def test_shutdown(self):
185 def test_shutdown(self):
190 # self.addEngine(4)
191 ids = self.client.ids
186 ids = self.client.ids
192 id0 = ids[0]
187 id0 = ids[0]
193 self.client.shutdown(id0, block=True)
188 self.client.shutdown(id0, block=True)
194 while id0 in self.client.ids:
189 while id0 in self.client.ids:
195 time.sleep(0.1)
190 time.sleep(0.1)
196 self.client.spin()
191 self.client.spin()
197
192
198 self.assertRaises(IndexError, lambda : self.client[id0])
193 self.assertRaises(IndexError, lambda : self.client[id0])
199
194
200 def test_result_status(self):
195 def test_result_status(self):
201 pass
196 pass
202 # to be written
197 # to be written
203
198
204 def test_db_query_dt(self):
199 def test_db_query_dt(self):
205 """test db query by date"""
200 """test db query by date"""
206 hist = self.client.hub_history()
201 hist = self.client.hub_history()
207 middle = self.client.db_query({'msg_id' : hist[len(hist)//2]})[0]
202 middle = self.client.db_query({'msg_id' : hist[len(hist)//2]})[0]
208 tic = middle['submitted']
203 tic = middle['submitted']
209 before = self.client.db_query({'submitted' : {'$lt' : tic}})
204 before = self.client.db_query({'submitted' : {'$lt' : tic}})
210 after = self.client.db_query({'submitted' : {'$gte' : tic}})
205 after = self.client.db_query({'submitted' : {'$gte' : tic}})
211 self.assertEquals(len(before)+len(after),len(hist))
206 self.assertEquals(len(before)+len(after),len(hist))
212 for b in before:
207 for b in before:
213 self.assertTrue(b['submitted'] < tic)
208 self.assertTrue(b['submitted'] < tic)
214 for a in after:
209 for a in after:
215 self.assertTrue(a['submitted'] >= tic)
210 self.assertTrue(a['submitted'] >= tic)
216 same = self.client.db_query({'submitted' : tic})
211 same = self.client.db_query({'submitted' : tic})
217 for s in same:
212 for s in same:
218 self.assertTrue(s['submitted'] == tic)
213 self.assertTrue(s['submitted'] == tic)
219
214
220 def test_db_query_keys(self):
215 def test_db_query_keys(self):
221 """test extracting subset of record keys"""
216 """test extracting subset of record keys"""
222 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted', 'completed'])
217 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted', 'completed'])
223 for rec in found:
218 for rec in found:
224 self.assertEquals(set(rec.keys()), set(['msg_id', 'submitted', 'completed']))
219 self.assertEquals(set(rec.keys()), set(['msg_id', 'submitted', 'completed']))
225
220
226 def test_db_query_default_keys(self):
221 def test_db_query_default_keys(self):
227 """default db_query excludes buffers"""
222 """default db_query excludes buffers"""
228 found = self.client.db_query({'msg_id': {'$ne' : ''}})
223 found = self.client.db_query({'msg_id': {'$ne' : ''}})
229 for rec in found:
224 for rec in found:
230 keys = set(rec.keys())
225 keys = set(rec.keys())
231 self.assertFalse('buffers' in keys, "'buffers' should not be in: %s" % keys)
226 self.assertFalse('buffers' in keys, "'buffers' should not be in: %s" % keys)
232 self.assertFalse('result_buffers' in keys, "'result_buffers' should not be in: %s" % keys)
227 self.assertFalse('result_buffers' in keys, "'result_buffers' should not be in: %s" % keys)
233
228
234 def test_db_query_msg_id(self):
229 def test_db_query_msg_id(self):
235 """ensure msg_id is always in db queries"""
230 """ensure msg_id is always in db queries"""
236 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted', 'completed'])
231 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted', 'completed'])
237 for rec in found:
232 for rec in found:
238 self.assertTrue('msg_id' in rec.keys())
233 self.assertTrue('msg_id' in rec.keys())
239 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted'])
234 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted'])
240 for rec in found:
235 for rec in found:
241 self.assertTrue('msg_id' in rec.keys())
236 self.assertTrue('msg_id' in rec.keys())
242 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['msg_id'])
237 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['msg_id'])
243 for rec in found:
238 for rec in found:
244 self.assertTrue('msg_id' in rec.keys())
239 self.assertTrue('msg_id' in rec.keys())
245
240
246 def test_db_query_in(self):
241 def test_db_query_in(self):
247 """test db query with '$in','$nin' operators"""
242 """test db query with '$in','$nin' operators"""
248 hist = self.client.hub_history()
243 hist = self.client.hub_history()
249 even = hist[::2]
244 even = hist[::2]
250 odd = hist[1::2]
245 odd = hist[1::2]
251 recs = self.client.db_query({ 'msg_id' : {'$in' : even}})
246 recs = self.client.db_query({ 'msg_id' : {'$in' : even}})
252 found = [ r['msg_id'] for r in recs ]
247 found = [ r['msg_id'] for r in recs ]
253 self.assertEquals(set(even), set(found))
248 self.assertEquals(set(even), set(found))
254 recs = self.client.db_query({ 'msg_id' : {'$nin' : even}})
249 recs = self.client.db_query({ 'msg_id' : {'$nin' : even}})
255 found = [ r['msg_id'] for r in recs ]
250 found = [ r['msg_id'] for r in recs ]
256 self.assertEquals(set(odd), set(found))
251 self.assertEquals(set(odd), set(found))
257
252
258 def test_hub_history(self):
253 def test_hub_history(self):
259 hist = self.client.hub_history()
254 hist = self.client.hub_history()
260 recs = self.client.db_query({ 'msg_id' : {"$ne":''}})
255 recs = self.client.db_query({ 'msg_id' : {"$ne":''}})
261 recdict = {}
256 recdict = {}
262 for rec in recs:
257 for rec in recs:
263 recdict[rec['msg_id']] = rec
258 recdict[rec['msg_id']] = rec
264
259
265 latest = datetime(1984,1,1)
260 latest = datetime(1984,1,1)
266 for msg_id in hist:
261 for msg_id in hist:
267 rec = recdict[msg_id]
262 rec = recdict[msg_id]
268 newt = rec['submitted']
263 newt = rec['submitted']
269 self.assertTrue(newt >= latest)
264 self.assertTrue(newt >= latest)
270 latest = newt
265 latest = newt
271 ar = self.client[-1].apply_async(lambda : 1)
266 ar = self.client[-1].apply_async(lambda : 1)
272 ar.get()
267 ar.get()
273 time.sleep(0.25)
268 time.sleep(0.25)
274 self.assertEquals(self.client.hub_history()[-1:],ar.msg_ids)
269 self.assertEquals(self.client.hub_history()[-1:],ar.msg_ids)
275
270
276 def test_resubmit(self):
271 def test_resubmit(self):
277 def f():
272 def f():
278 import random
273 import random
279 return random.random()
274 return random.random()
280 v = self.client.load_balanced_view()
275 v = self.client.load_balanced_view()
281 ar = v.apply_async(f)
276 ar = v.apply_async(f)
282 r1 = ar.get(1)
277 r1 = ar.get(1)
283 # give the Hub a chance to notice:
278 # give the Hub a chance to notice:
284 time.sleep(0.5)
279 time.sleep(0.5)
285 ahr = self.client.resubmit(ar.msg_ids)
280 ahr = self.client.resubmit(ar.msg_ids)
286 r2 = ahr.get(1)
281 r2 = ahr.get(1)
287 self.assertFalse(r1 == r2)
282 self.assertFalse(r1 == r2)
288
283
289 def test_resubmit_inflight(self):
284 def test_resubmit_inflight(self):
290 """ensure ValueError on resubmit of inflight task"""
285 """ensure ValueError on resubmit of inflight task"""
291 v = self.client.load_balanced_view()
286 v = self.client.load_balanced_view()
292 ar = v.apply_async(time.sleep,1)
287 ar = v.apply_async(time.sleep,1)
293 # give the message a chance to arrive
288 # give the message a chance to arrive
294 time.sleep(0.2)
289 time.sleep(0.2)
295 self.assertRaisesRemote(ValueError, self.client.resubmit, ar.msg_ids)
290 self.assertRaisesRemote(ValueError, self.client.resubmit, ar.msg_ids)
296 ar.get(2)
291 ar.get(2)
297
292
298 def test_resubmit_badkey(self):
293 def test_resubmit_badkey(self):
299 """ensure KeyError on resubmit of nonexistant task"""
294 """ensure KeyError on resubmit of nonexistant task"""
300 self.assertRaisesRemote(KeyError, self.client.resubmit, ['invalid'])
295 self.assertRaisesRemote(KeyError, self.client.resubmit, ['invalid'])
301
296
302 def test_purge_results(self):
297 def test_purge_results(self):
303 # ensure there are some tasks
298 # ensure there are some tasks
304 for i in range(5):
299 for i in range(5):
305 self.client[:].apply_sync(lambda : 1)
300 self.client[:].apply_sync(lambda : 1)
306 # Wait for the Hub to realise the result is done:
301 # Wait for the Hub to realise the result is done:
307 # This prevents a race condition, where we
302 # This prevents a race condition, where we
308 # might purge a result the Hub still thinks is pending.
303 # might purge a result the Hub still thinks is pending.
309 time.sleep(0.1)
304 time.sleep(0.1)
310 rc2 = clientmod.Client(profile='iptest')
305 rc2 = clientmod.Client(profile='iptest')
311 hist = self.client.hub_history()
306 hist = self.client.hub_history()
312 ahr = rc2.get_result([hist[-1]])
307 ahr = rc2.get_result([hist[-1]])
313 ahr.wait(10)
308 ahr.wait(10)
314 self.client.purge_results(hist[-1])
309 self.client.purge_results(hist[-1])
315 newhist = self.client.hub_history()
310 newhist = self.client.hub_history()
316 self.assertEquals(len(newhist)+1,len(hist))
311 self.assertEquals(len(newhist)+1,len(hist))
317 rc2.spin()
312 rc2.spin()
318 rc2.close()
313 rc2.close()
319
314
320 def test_purge_all_results(self):
315 def test_purge_all_results(self):
321 self.client.purge_results('all')
316 self.client.purge_results('all')
322 hist = self.client.hub_history()
317 hist = self.client.hub_history()
323 self.assertEquals(len(hist), 0)
318 self.assertEquals(len(hist), 0)
324
319
@@ -1,106 +1,106 b''
1 """Tests for dependency.py
1 """Tests for dependency.py
2
2
3 Authors:
3 Authors:
4
4
5 * Min RK
5 * Min RK
6 """
6 """
7
7
8 __docformat__ = "restructuredtext en"
8 __docformat__ = "restructuredtext en"
9
9
10 #-------------------------------------------------------------------------------
10 #-------------------------------------------------------------------------------
11 # Copyright (C) 2011 The IPython Development Team
11 # Copyright (C) 2011 The IPython Development Team
12 #
12 #
13 # Distributed under the terms of the BSD License. The full license is in
13 # Distributed under the terms of the BSD License. The full license is in
14 # the file COPYING, distributed as part of this software.
14 # the file COPYING, distributed as part of this software.
15 #-------------------------------------------------------------------------------
15 #-------------------------------------------------------------------------------
16
16
17 #-------------------------------------------------------------------------------
17 #-------------------------------------------------------------------------------
18 # Imports
18 # Imports
19 #-------------------------------------------------------------------------------
19 #-------------------------------------------------------------------------------
20
20
21 # import
21 # import
22 import os
22 import os
23
23
24 from IPython.utils.pickleutil import can, uncan
24 from IPython.utils.pickleutil import can, uncan
25
25
26 import IPython.parallel as pmod
26 import IPython.parallel as pmod
27 from IPython.parallel.util import interactive
27 from IPython.parallel.util import interactive
28
28
29 from IPython.parallel.tests import add_engines
29 from IPython.parallel.tests import add_engines
30 from .clienttest import ClusterTestCase
30 from .clienttest import ClusterTestCase
31
31
32 def setup():
32 def setup():
33 add_engines(1)
33 add_engines(1, total=True)
34
34
35 @pmod.require('time')
35 @pmod.require('time')
36 def wait(n):
36 def wait(n):
37 time.sleep(n)
37 time.sleep(n)
38 return n
38 return n
39
39
40 mixed = map(str, range(10))
40 mixed = map(str, range(10))
41 completed = map(str, range(0,10,2))
41 completed = map(str, range(0,10,2))
42 failed = map(str, range(1,10,2))
42 failed = map(str, range(1,10,2))
43
43
44 class DependencyTest(ClusterTestCase):
44 class DependencyTest(ClusterTestCase):
45
45
46 def setUp(self):
46 def setUp(self):
47 ClusterTestCase.setUp(self)
47 ClusterTestCase.setUp(self)
48 self.user_ns = {'__builtins__' : __builtins__}
48 self.user_ns = {'__builtins__' : __builtins__}
49 self.view = self.client.load_balanced_view()
49 self.view = self.client.load_balanced_view()
50 self.dview = self.client[-1]
50 self.dview = self.client[-1]
51 self.succeeded = set(map(str, range(0,25,2)))
51 self.succeeded = set(map(str, range(0,25,2)))
52 self.failed = set(map(str, range(1,25,2)))
52 self.failed = set(map(str, range(1,25,2)))
53
53
54 def assertMet(self, dep):
54 def assertMet(self, dep):
55 self.assertTrue(dep.check(self.succeeded, self.failed), "Dependency should be met")
55 self.assertTrue(dep.check(self.succeeded, self.failed), "Dependency should be met")
56
56
57 def assertUnmet(self, dep):
57 def assertUnmet(self, dep):
58 self.assertFalse(dep.check(self.succeeded, self.failed), "Dependency should not be met")
58 self.assertFalse(dep.check(self.succeeded, self.failed), "Dependency should not be met")
59
59
60 def assertUnreachable(self, dep):
60 def assertUnreachable(self, dep):
61 self.assertTrue(dep.unreachable(self.succeeded, self.failed), "Dependency should be unreachable")
61 self.assertTrue(dep.unreachable(self.succeeded, self.failed), "Dependency should be unreachable")
62
62
63 def assertReachable(self, dep):
63 def assertReachable(self, dep):
64 self.assertFalse(dep.unreachable(self.succeeded, self.failed), "Dependency should be reachable")
64 self.assertFalse(dep.unreachable(self.succeeded, self.failed), "Dependency should be reachable")
65
65
66 def cancan(self, f):
66 def cancan(self, f):
67 """decorator to pass through canning into self.user_ns"""
67 """decorator to pass through canning into self.user_ns"""
68 return uncan(can(f), self.user_ns)
68 return uncan(can(f), self.user_ns)
69
69
70 def test_require_imports(self):
70 def test_require_imports(self):
71 """test that @require imports names"""
71 """test that @require imports names"""
72 @self.cancan
72 @self.cancan
73 @pmod.require('urllib')
73 @pmod.require('urllib')
74 @interactive
74 @interactive
75 def encode(dikt):
75 def encode(dikt):
76 return urllib.urlencode(dikt)
76 return urllib.urlencode(dikt)
77 # must pass through canning to properly connect namespaces
77 # must pass through canning to properly connect namespaces
78 self.assertEquals(encode(dict(a=5)), 'a=5')
78 self.assertEquals(encode(dict(a=5)), 'a=5')
79
79
80 def test_success_only(self):
80 def test_success_only(self):
81 dep = pmod.Dependency(mixed, success=True, failure=False)
81 dep = pmod.Dependency(mixed, success=True, failure=False)
82 self.assertUnmet(dep)
82 self.assertUnmet(dep)
83 self.assertUnreachable(dep)
83 self.assertUnreachable(dep)
84 dep.all=False
84 dep.all=False
85 self.assertMet(dep)
85 self.assertMet(dep)
86 self.assertReachable(dep)
86 self.assertReachable(dep)
87 dep = pmod.Dependency(completed, success=True, failure=False)
87 dep = pmod.Dependency(completed, success=True, failure=False)
88 self.assertMet(dep)
88 self.assertMet(dep)
89 self.assertReachable(dep)
89 self.assertReachable(dep)
90 dep.all=False
90 dep.all=False
91 self.assertMet(dep)
91 self.assertMet(dep)
92 self.assertReachable(dep)
92 self.assertReachable(dep)
93
93
94 def test_failure_only(self):
94 def test_failure_only(self):
95 dep = pmod.Dependency(mixed, success=False, failure=True)
95 dep = pmod.Dependency(mixed, success=False, failure=True)
96 self.assertUnmet(dep)
96 self.assertUnmet(dep)
97 self.assertUnreachable(dep)
97 self.assertUnreachable(dep)
98 dep.all=False
98 dep.all=False
99 self.assertMet(dep)
99 self.assertMet(dep)
100 self.assertReachable(dep)
100 self.assertReachable(dep)
101 dep = pmod.Dependency(completed, success=False, failure=True)
101 dep = pmod.Dependency(completed, success=False, failure=True)
102 self.assertUnmet(dep)
102 self.assertUnmet(dep)
103 self.assertUnreachable(dep)
103 self.assertUnreachable(dep)
104 dep.all=False
104 dep.all=False
105 self.assertUnmet(dep)
105 self.assertUnmet(dep)
106 self.assertUnreachable(dep)
106 self.assertUnreachable(dep)
@@ -1,178 +1,176 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2 """test LoadBalancedView objects
2 """test LoadBalancedView objects
3
3
4 Authors:
4 Authors:
5
5
6 * Min RK
6 * Min RK
7 """
7 """
8 #-------------------------------------------------------------------------------
8 #-------------------------------------------------------------------------------
9 # Copyright (C) 2011 The IPython Development Team
9 # Copyright (C) 2011 The IPython Development Team
10 #
10 #
11 # Distributed under the terms of the BSD License. The full license is in
11 # Distributed under the terms of the BSD License. The full license is in
12 # the file COPYING, distributed as part of this software.
12 # the file COPYING, distributed as part of this software.
13 #-------------------------------------------------------------------------------
13 #-------------------------------------------------------------------------------
14
14
15 #-------------------------------------------------------------------------------
15 #-------------------------------------------------------------------------------
16 # Imports
16 # Imports
17 #-------------------------------------------------------------------------------
17 #-------------------------------------------------------------------------------
18
18
19 import sys
19 import sys
20 import time
20 import time
21
21
22 import zmq
22 import zmq
23 from nose import SkipTest
23 from nose import SkipTest
24
24
25 from IPython import parallel as pmod
25 from IPython import parallel as pmod
26 from IPython.parallel import error
26 from IPython.parallel import error
27
27
28 from IPython.parallel.tests import add_engines
28 from IPython.parallel.tests import add_engines
29
29
30 from .clienttest import ClusterTestCase, crash, wait, skip_without
30 from .clienttest import ClusterTestCase, crash, wait, skip_without
31
31
32 def setup():
32 def setup():
33 add_engines(3)
33 add_engines(3, total=True)
34
34
35 class TestLoadBalancedView(ClusterTestCase):
35 class TestLoadBalancedView(ClusterTestCase):
36
36
37 def setUp(self):
37 def setUp(self):
38 ClusterTestCase.setUp(self)
38 ClusterTestCase.setUp(self)
39 self.view = self.client.load_balanced_view()
39 self.view = self.client.load_balanced_view()
40
40
41 def test_z_crash_task(self):
41 def test_z_crash_task(self):
42 """test graceful handling of engine death (balanced)"""
42 """test graceful handling of engine death (balanced)"""
43 raise SkipTest("crash tests disabled, due to undesirable crash reports")
43 raise SkipTest("crash tests disabled, due to undesirable crash reports")
44 # self.add_engines(1)
44 # self.add_engines(1)
45 ar = self.view.apply_async(crash)
45 ar = self.view.apply_async(crash)
46 self.assertRaisesRemote(error.EngineError, ar.get, 10)
46 self.assertRaisesRemote(error.EngineError, ar.get, 10)
47 eid = ar.engine_id
47 eid = ar.engine_id
48 tic = time.time()
48 tic = time.time()
49 while eid in self.client.ids and time.time()-tic < 5:
49 while eid in self.client.ids and time.time()-tic < 5:
50 time.sleep(.01)
50 time.sleep(.01)
51 self.client.spin()
51 self.client.spin()
52 self.assertFalse(eid in self.client.ids, "Engine should have died")
52 self.assertFalse(eid in self.client.ids, "Engine should have died")
53
53
54 def test_map(self):
54 def test_map(self):
55 def f(x):
55 def f(x):
56 return x**2
56 return x**2
57 data = range(16)
57 data = range(16)
58 r = self.view.map_sync(f, data)
58 r = self.view.map_sync(f, data)
59 self.assertEquals(r, map(f, data))
59 self.assertEquals(r, map(f, data))
60
60
61 def test_map_unordered(self):
61 def test_map_unordered(self):
62 def f(x):
62 def f(x):
63 return x**2
63 return x**2
64 def slow_f(x):
64 def slow_f(x):
65 import time
65 import time
66 time.sleep(0.05*x)
66 time.sleep(0.05*x)
67 return x**2
67 return x**2
68 data = range(16,0,-1)
68 data = range(16,0,-1)
69 reference = map(f, data)
69 reference = map(f, data)
70
70
71 amr = self.view.map_async(slow_f, data, ordered=False)
71 amr = self.view.map_async(slow_f, data, ordered=False)
72 self.assertTrue(isinstance(amr, pmod.AsyncMapResult))
72 self.assertTrue(isinstance(amr, pmod.AsyncMapResult))
73 # check individual elements, retrieved as they come
73 # check individual elements, retrieved as they come
74 # list comprehension uses __iter__
74 # list comprehension uses __iter__
75 astheycame = [ r for r in amr ]
75 astheycame = [ r for r in amr ]
76 # Ensure that at least one result came out of order:
76 # Ensure that at least one result came out of order:
77 self.assertNotEquals(astheycame, reference, "should not have preserved order")
77 self.assertNotEquals(astheycame, reference, "should not have preserved order")
78 self.assertEquals(sorted(astheycame, reverse=True), reference, "result corrupted")
78 self.assertEquals(sorted(astheycame, reverse=True), reference, "result corrupted")
79
79
80 def test_map_ordered(self):
80 def test_map_ordered(self):
81 def f(x):
81 def f(x):
82 return x**2
82 return x**2
83 def slow_f(x):
83 def slow_f(x):
84 import time
84 import time
85 time.sleep(0.05*x)
85 time.sleep(0.05*x)
86 return x**2
86 return x**2
87 data = range(16,0,-1)
87 data = range(16,0,-1)
88 reference = map(f, data)
88 reference = map(f, data)
89
89
90 amr = self.view.map_async(slow_f, data)
90 amr = self.view.map_async(slow_f, data)
91 self.assertTrue(isinstance(amr, pmod.AsyncMapResult))
91 self.assertTrue(isinstance(amr, pmod.AsyncMapResult))
92 # check individual elements, retrieved as they come
92 # check individual elements, retrieved as they come
93 # list(amr) uses __iter__
93 # list(amr) uses __iter__
94 astheycame = list(amr)
94 astheycame = list(amr)
95 # Ensure that results came in order
95 # Ensure that results came in order
96 self.assertEquals(astheycame, reference)
96 self.assertEquals(astheycame, reference)
97 self.assertEquals(amr.result, reference)
97 self.assertEquals(amr.result, reference)
98
98
99 def test_map_iterable(self):
99 def test_map_iterable(self):
100 """test map on iterables (balanced)"""
100 """test map on iterables (balanced)"""
101 view = self.view
101 view = self.view
102 # 101 is prime, so it won't be evenly distributed
102 # 101 is prime, so it won't be evenly distributed
103 arr = range(101)
103 arr = range(101)
104 # so that it will be an iterator, even in Python 3
104 # so that it will be an iterator, even in Python 3
105 it = iter(arr)
105 it = iter(arr)
106 r = view.map_sync(lambda x:x, arr)
106 r = view.map_sync(lambda x:x, arr)
107 self.assertEquals(r, list(arr))
107 self.assertEquals(r, list(arr))
108
108
109
109
110 def test_abort(self):
110 def test_abort(self):
111 view = self.view
111 view = self.view
112 ar = self.client[:].apply_async(time.sleep, .5)
112 ar = self.client[:].apply_async(time.sleep, .5)
113 ar = self.client[:].apply_async(time.sleep, .5)
113 ar = self.client[:].apply_async(time.sleep, .5)
114 time.sleep(0.2)
114 time.sleep(0.2)
115 ar2 = view.apply_async(lambda : 2)
115 ar2 = view.apply_async(lambda : 2)
116 ar3 = view.apply_async(lambda : 3)
116 ar3 = view.apply_async(lambda : 3)
117 view.abort(ar2)
117 view.abort(ar2)
118 view.abort(ar3.msg_ids)
118 view.abort(ar3.msg_ids)
119 self.assertRaises(error.TaskAborted, ar2.get)
119 self.assertRaises(error.TaskAborted, ar2.get)
120 self.assertRaises(error.TaskAborted, ar3.get)
120 self.assertRaises(error.TaskAborted, ar3.get)
121
121
122 def test_retries(self):
122 def test_retries(self):
123 add_engines(3)
124 view = self.view
123 view = self.view
125 view.timeout = 1 # prevent hang if this doesn't behave
124 view.timeout = 1 # prevent hang if this doesn't behave
126 def fail():
125 def fail():
127 assert False
126 assert False
128 for r in range(len(self.client)-1):
127 for r in range(len(self.client)-1):
129 with view.temp_flags(retries=r):
128 with view.temp_flags(retries=r):
130 self.assertRaisesRemote(AssertionError, view.apply_sync, fail)
129 self.assertRaisesRemote(AssertionError, view.apply_sync, fail)
131
130
132 with view.temp_flags(retries=len(self.client), timeout=0.25):
131 with view.temp_flags(retries=len(self.client), timeout=0.25):
133 self.assertRaisesRemote(error.TaskTimeout, view.apply_sync, fail)
132 self.assertRaisesRemote(error.TaskTimeout, view.apply_sync, fail)
134
133
135 def test_invalid_dependency(self):
134 def test_invalid_dependency(self):
136 view = self.view
135 view = self.view
137 with view.temp_flags(after='12345'):
136 with view.temp_flags(after='12345'):
138 self.assertRaisesRemote(error.InvalidDependency, view.apply_sync, lambda : 1)
137 self.assertRaisesRemote(error.InvalidDependency, view.apply_sync, lambda : 1)
139
138
140 def test_impossible_dependency(self):
139 def test_impossible_dependency(self):
141 if len(self.client) < 2:
140 self.minimum_engines(2)
142 add_engines(2)
143 view = self.client.load_balanced_view()
141 view = self.client.load_balanced_view()
144 ar1 = view.apply_async(lambda : 1)
142 ar1 = view.apply_async(lambda : 1)
145 ar1.get()
143 ar1.get()
146 e1 = ar1.engine_id
144 e1 = ar1.engine_id
147 e2 = e1
145 e2 = e1
148 while e2 == e1:
146 while e2 == e1:
149 ar2 = view.apply_async(lambda : 1)
147 ar2 = view.apply_async(lambda : 1)
150 ar2.get()
148 ar2.get()
151 e2 = ar2.engine_id
149 e2 = ar2.engine_id
152
150
153 with view.temp_flags(follow=[ar1, ar2]):
151 with view.temp_flags(follow=[ar1, ar2]):
154 self.assertRaisesRemote(error.ImpossibleDependency, view.apply_sync, lambda : 1)
152 self.assertRaisesRemote(error.ImpossibleDependency, view.apply_sync, lambda : 1)
155
153
156
154
157 def test_follow(self):
155 def test_follow(self):
158 ar = self.view.apply_async(lambda : 1)
156 ar = self.view.apply_async(lambda : 1)
159 ar.get()
157 ar.get()
160 ars = []
158 ars = []
161 first_id = ar.engine_id
159 first_id = ar.engine_id
162
160
163 self.view.follow = ar
161 self.view.follow = ar
164 for i in range(5):
162 for i in range(5):
165 ars.append(self.view.apply_async(lambda : 1))
163 ars.append(self.view.apply_async(lambda : 1))
166 self.view.wait(ars)
164 self.view.wait(ars)
167 for ar in ars:
165 for ar in ars:
168 self.assertEquals(ar.engine_id, first_id)
166 self.assertEquals(ar.engine_id, first_id)
169
167
170 def test_after(self):
168 def test_after(self):
171 view = self.view
169 view = self.view
172 ar = view.apply_async(time.sleep, 0.5)
170 ar = view.apply_async(time.sleep, 0.5)
173 with view.temp_flags(after=ar):
171 with view.temp_flags(after=ar):
174 ar2 = view.apply_async(lambda : 1)
172 ar2 = view.apply_async(lambda : 1)
175
173
176 ar.wait()
174 ar.wait()
177 ar2.wait()
175 ar2.wait()
178 self.assertTrue(ar2.started >= ar.completed, "%s not >= %s"%(ar.started, ar.completed))
176 self.assertTrue(ar2.started >= ar.completed, "%s not >= %s"%(ar.started, ar.completed))
@@ -1,507 +1,507 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2 """test View objects
2 """test View objects
3
3
4 Authors:
4 Authors:
5
5
6 * Min RK
6 * Min RK
7 """
7 """
8 #-------------------------------------------------------------------------------
8 #-------------------------------------------------------------------------------
9 # Copyright (C) 2011 The IPython Development Team
9 # Copyright (C) 2011 The IPython Development Team
10 #
10 #
11 # Distributed under the terms of the BSD License. The full license is in
11 # Distributed under the terms of the BSD License. The full license is in
12 # the file COPYING, distributed as part of this software.
12 # the file COPYING, distributed as part of this software.
13 #-------------------------------------------------------------------------------
13 #-------------------------------------------------------------------------------
14
14
15 #-------------------------------------------------------------------------------
15 #-------------------------------------------------------------------------------
16 # Imports
16 # Imports
17 #-------------------------------------------------------------------------------
17 #-------------------------------------------------------------------------------
18
18
19 import sys
19 import sys
20 import time
20 import time
21 from tempfile import mktemp
21 from tempfile import mktemp
22 from StringIO import StringIO
22 from StringIO import StringIO
23
23
24 import zmq
24 import zmq
25 from nose import SkipTest
25 from nose import SkipTest
26
26
27 from IPython.testing import decorators as dec
27 from IPython.testing import decorators as dec
28
28
29 from IPython import parallel as pmod
29 from IPython import parallel as pmod
30 from IPython.parallel import error
30 from IPython.parallel import error
31 from IPython.parallel import AsyncResult, AsyncHubResult, AsyncMapResult
31 from IPython.parallel import AsyncResult, AsyncHubResult, AsyncMapResult
32 from IPython.parallel import DirectView
32 from IPython.parallel import DirectView
33 from IPython.parallel.util import interactive
33 from IPython.parallel.util import interactive
34
34
35 from IPython.parallel.tests import add_engines
35 from IPython.parallel.tests import add_engines
36
36
37 from .clienttest import ClusterTestCase, crash, wait, skip_without
37 from .clienttest import ClusterTestCase, crash, wait, skip_without
38
38
39 def setup():
39 def setup():
40 add_engines(3)
40 add_engines(3, total=True)
41
41
42 class TestView(ClusterTestCase):
42 class TestView(ClusterTestCase):
43
43
44 def test_z_crash_mux(self):
44 def test_z_crash_mux(self):
45 """test graceful handling of engine death (direct)"""
45 """test graceful handling of engine death (direct)"""
46 raise SkipTest("crash tests disabled, due to undesirable crash reports")
46 raise SkipTest("crash tests disabled, due to undesirable crash reports")
47 # self.add_engines(1)
47 # self.add_engines(1)
48 eid = self.client.ids[-1]
48 eid = self.client.ids[-1]
49 ar = self.client[eid].apply_async(crash)
49 ar = self.client[eid].apply_async(crash)
50 self.assertRaisesRemote(error.EngineError, ar.get, 10)
50 self.assertRaisesRemote(error.EngineError, ar.get, 10)
51 eid = ar.engine_id
51 eid = ar.engine_id
52 tic = time.time()
52 tic = time.time()
53 while eid in self.client.ids and time.time()-tic < 5:
53 while eid in self.client.ids and time.time()-tic < 5:
54 time.sleep(.01)
54 time.sleep(.01)
55 self.client.spin()
55 self.client.spin()
56 self.assertFalse(eid in self.client.ids, "Engine should have died")
56 self.assertFalse(eid in self.client.ids, "Engine should have died")
57
57
58 def test_push_pull(self):
58 def test_push_pull(self):
59 """test pushing and pulling"""
59 """test pushing and pulling"""
60 data = dict(a=10, b=1.05, c=range(10), d={'e':(1,2),'f':'hi'})
60 data = dict(a=10, b=1.05, c=range(10), d={'e':(1,2),'f':'hi'})
61 t = self.client.ids[-1]
61 t = self.client.ids[-1]
62 v = self.client[t]
62 v = self.client[t]
63 push = v.push
63 push = v.push
64 pull = v.pull
64 pull = v.pull
65 v.block=True
65 v.block=True
66 nengines = len(self.client)
66 nengines = len(self.client)
67 push({'data':data})
67 push({'data':data})
68 d = pull('data')
68 d = pull('data')
69 self.assertEquals(d, data)
69 self.assertEquals(d, data)
70 self.client[:].push({'data':data})
70 self.client[:].push({'data':data})
71 d = self.client[:].pull('data', block=True)
71 d = self.client[:].pull('data', block=True)
72 self.assertEquals(d, nengines*[data])
72 self.assertEquals(d, nengines*[data])
73 ar = push({'data':data}, block=False)
73 ar = push({'data':data}, block=False)
74 self.assertTrue(isinstance(ar, AsyncResult))
74 self.assertTrue(isinstance(ar, AsyncResult))
75 r = ar.get()
75 r = ar.get()
76 ar = self.client[:].pull('data', block=False)
76 ar = self.client[:].pull('data', block=False)
77 self.assertTrue(isinstance(ar, AsyncResult))
77 self.assertTrue(isinstance(ar, AsyncResult))
78 r = ar.get()
78 r = ar.get()
79 self.assertEquals(r, nengines*[data])
79 self.assertEquals(r, nengines*[data])
80 self.client[:].push(dict(a=10,b=20))
80 self.client[:].push(dict(a=10,b=20))
81 r = self.client[:].pull(('a','b'), block=True)
81 r = self.client[:].pull(('a','b'), block=True)
82 self.assertEquals(r, nengines*[[10,20]])
82 self.assertEquals(r, nengines*[[10,20]])
83
83
84 def test_push_pull_function(self):
84 def test_push_pull_function(self):
85 "test pushing and pulling functions"
85 "test pushing and pulling functions"
86 def testf(x):
86 def testf(x):
87 return 2.0*x
87 return 2.0*x
88
88
89 t = self.client.ids[-1]
89 t = self.client.ids[-1]
90 v = self.client[t]
90 v = self.client[t]
91 v.block=True
91 v.block=True
92 push = v.push
92 push = v.push
93 pull = v.pull
93 pull = v.pull
94 execute = v.execute
94 execute = v.execute
95 push({'testf':testf})
95 push({'testf':testf})
96 r = pull('testf')
96 r = pull('testf')
97 self.assertEqual(r(1.0), testf(1.0))
97 self.assertEqual(r(1.0), testf(1.0))
98 execute('r = testf(10)')
98 execute('r = testf(10)')
99 r = pull('r')
99 r = pull('r')
100 self.assertEquals(r, testf(10))
100 self.assertEquals(r, testf(10))
101 ar = self.client[:].push({'testf':testf}, block=False)
101 ar = self.client[:].push({'testf':testf}, block=False)
102 ar.get()
102 ar.get()
103 ar = self.client[:].pull('testf', block=False)
103 ar = self.client[:].pull('testf', block=False)
104 rlist = ar.get()
104 rlist = ar.get()
105 for r in rlist:
105 for r in rlist:
106 self.assertEqual(r(1.0), testf(1.0))
106 self.assertEqual(r(1.0), testf(1.0))
107 execute("def g(x): return x*x")
107 execute("def g(x): return x*x")
108 r = pull(('testf','g'))
108 r = pull(('testf','g'))
109 self.assertEquals((r[0](10),r[1](10)), (testf(10), 100))
109 self.assertEquals((r[0](10),r[1](10)), (testf(10), 100))
110
110
111 def test_push_function_globals(self):
111 def test_push_function_globals(self):
112 """test that pushed functions have access to globals"""
112 """test that pushed functions have access to globals"""
113 @interactive
113 @interactive
114 def geta():
114 def geta():
115 return a
115 return a
116 # self.add_engines(1)
116 # self.add_engines(1)
117 v = self.client[-1]
117 v = self.client[-1]
118 v.block=True
118 v.block=True
119 v['f'] = geta
119 v['f'] = geta
120 self.assertRaisesRemote(NameError, v.execute, 'b=f()')
120 self.assertRaisesRemote(NameError, v.execute, 'b=f()')
121 v.execute('a=5')
121 v.execute('a=5')
122 v.execute('b=f()')
122 v.execute('b=f()')
123 self.assertEquals(v['b'], 5)
123 self.assertEquals(v['b'], 5)
124
124
125 def test_push_function_defaults(self):
125 def test_push_function_defaults(self):
126 """test that pushed functions preserve default args"""
126 """test that pushed functions preserve default args"""
127 def echo(a=10):
127 def echo(a=10):
128 return a
128 return a
129 v = self.client[-1]
129 v = self.client[-1]
130 v.block=True
130 v.block=True
131 v['f'] = echo
131 v['f'] = echo
132 v.execute('b=f()')
132 v.execute('b=f()')
133 self.assertEquals(v['b'], 10)
133 self.assertEquals(v['b'], 10)
134
134
135 def test_get_result(self):
135 def test_get_result(self):
136 """test getting results from the Hub."""
136 """test getting results from the Hub."""
137 c = pmod.Client(profile='iptest')
137 c = pmod.Client(profile='iptest')
138 # self.add_engines(1)
138 # self.add_engines(1)
139 t = c.ids[-1]
139 t = c.ids[-1]
140 v = c[t]
140 v = c[t]
141 v2 = self.client[t]
141 v2 = self.client[t]
142 ar = v.apply_async(wait, 1)
142 ar = v.apply_async(wait, 1)
143 # give the monitor time to notice the message
143 # give the monitor time to notice the message
144 time.sleep(.25)
144 time.sleep(.25)
145 ahr = v2.get_result(ar.msg_ids)
145 ahr = v2.get_result(ar.msg_ids)
146 self.assertTrue(isinstance(ahr, AsyncHubResult))
146 self.assertTrue(isinstance(ahr, AsyncHubResult))
147 self.assertEquals(ahr.get(), ar.get())
147 self.assertEquals(ahr.get(), ar.get())
148 ar2 = v2.get_result(ar.msg_ids)
148 ar2 = v2.get_result(ar.msg_ids)
149 self.assertFalse(isinstance(ar2, AsyncHubResult))
149 self.assertFalse(isinstance(ar2, AsyncHubResult))
150 c.spin()
150 c.spin()
151 c.close()
151 c.close()
152
152
153 def test_run_newline(self):
153 def test_run_newline(self):
154 """test that run appends newline to files"""
154 """test that run appends newline to files"""
155 tmpfile = mktemp()
155 tmpfile = mktemp()
156 with open(tmpfile, 'w') as f:
156 with open(tmpfile, 'w') as f:
157 f.write("""def g():
157 f.write("""def g():
158 return 5
158 return 5
159 """)
159 """)
160 v = self.client[-1]
160 v = self.client[-1]
161 v.run(tmpfile, block=True)
161 v.run(tmpfile, block=True)
162 self.assertEquals(v.apply_sync(lambda f: f(), pmod.Reference('g')), 5)
162 self.assertEquals(v.apply_sync(lambda f: f(), pmod.Reference('g')), 5)
163
163
164 def test_apply_tracked(self):
164 def test_apply_tracked(self):
165 """test tracking for apply"""
165 """test tracking for apply"""
166 # self.add_engines(1)
166 # self.add_engines(1)
167 t = self.client.ids[-1]
167 t = self.client.ids[-1]
168 v = self.client[t]
168 v = self.client[t]
169 v.block=False
169 v.block=False
170 def echo(n=1024*1024, **kwargs):
170 def echo(n=1024*1024, **kwargs):
171 with v.temp_flags(**kwargs):
171 with v.temp_flags(**kwargs):
172 return v.apply(lambda x: x, 'x'*n)
172 return v.apply(lambda x: x, 'x'*n)
173 ar = echo(1, track=False)
173 ar = echo(1, track=False)
174 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
174 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
175 self.assertTrue(ar.sent)
175 self.assertTrue(ar.sent)
176 ar = echo(track=True)
176 ar = echo(track=True)
177 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
177 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
178 self.assertEquals(ar.sent, ar._tracker.done)
178 self.assertEquals(ar.sent, ar._tracker.done)
179 ar._tracker.wait()
179 ar._tracker.wait()
180 self.assertTrue(ar.sent)
180 self.assertTrue(ar.sent)
181
181
182 def test_push_tracked(self):
182 def test_push_tracked(self):
183 t = self.client.ids[-1]
183 t = self.client.ids[-1]
184 ns = dict(x='x'*1024*1024)
184 ns = dict(x='x'*1024*1024)
185 v = self.client[t]
185 v = self.client[t]
186 ar = v.push(ns, block=False, track=False)
186 ar = v.push(ns, block=False, track=False)
187 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
187 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
188 self.assertTrue(ar.sent)
188 self.assertTrue(ar.sent)
189
189
190 ar = v.push(ns, block=False, track=True)
190 ar = v.push(ns, block=False, track=True)
191 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
191 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
192 ar._tracker.wait()
192 ar._tracker.wait()
193 self.assertEquals(ar.sent, ar._tracker.done)
193 self.assertEquals(ar.sent, ar._tracker.done)
194 self.assertTrue(ar.sent)
194 self.assertTrue(ar.sent)
195 ar.get()
195 ar.get()
196
196
197 def test_scatter_tracked(self):
197 def test_scatter_tracked(self):
198 t = self.client.ids
198 t = self.client.ids
199 x='x'*1024*1024
199 x='x'*1024*1024
200 ar = self.client[t].scatter('x', x, block=False, track=False)
200 ar = self.client[t].scatter('x', x, block=False, track=False)
201 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
201 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
202 self.assertTrue(ar.sent)
202 self.assertTrue(ar.sent)
203
203
204 ar = self.client[t].scatter('x', x, block=False, track=True)
204 ar = self.client[t].scatter('x', x, block=False, track=True)
205 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
205 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
206 self.assertEquals(ar.sent, ar._tracker.done)
206 self.assertEquals(ar.sent, ar._tracker.done)
207 ar._tracker.wait()
207 ar._tracker.wait()
208 self.assertTrue(ar.sent)
208 self.assertTrue(ar.sent)
209 ar.get()
209 ar.get()
210
210
211 def test_remote_reference(self):
211 def test_remote_reference(self):
212 v = self.client[-1]
212 v = self.client[-1]
213 v['a'] = 123
213 v['a'] = 123
214 ra = pmod.Reference('a')
214 ra = pmod.Reference('a')
215 b = v.apply_sync(lambda x: x, ra)
215 b = v.apply_sync(lambda x: x, ra)
216 self.assertEquals(b, 123)
216 self.assertEquals(b, 123)
217
217
218
218
219 def test_scatter_gather(self):
219 def test_scatter_gather(self):
220 view = self.client[:]
220 view = self.client[:]
221 seq1 = range(16)
221 seq1 = range(16)
222 view.scatter('a', seq1)
222 view.scatter('a', seq1)
223 seq2 = view.gather('a', block=True)
223 seq2 = view.gather('a', block=True)
224 self.assertEquals(seq2, seq1)
224 self.assertEquals(seq2, seq1)
225 self.assertRaisesRemote(NameError, view.gather, 'asdf', block=True)
225 self.assertRaisesRemote(NameError, view.gather, 'asdf', block=True)
226
226
227 @skip_without('numpy')
227 @skip_without('numpy')
228 def test_scatter_gather_numpy(self):
228 def test_scatter_gather_numpy(self):
229 import numpy
229 import numpy
230 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
230 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
231 view = self.client[:]
231 view = self.client[:]
232 a = numpy.arange(64)
232 a = numpy.arange(64)
233 view.scatter('a', a)
233 view.scatter('a', a)
234 b = view.gather('a', block=True)
234 b = view.gather('a', block=True)
235 assert_array_equal(b, a)
235 assert_array_equal(b, a)
236
236
237 def test_map(self):
237 def test_map(self):
238 view = self.client[:]
238 view = self.client[:]
239 def f(x):
239 def f(x):
240 return x**2
240 return x**2
241 data = range(16)
241 data = range(16)
242 r = view.map_sync(f, data)
242 r = view.map_sync(f, data)
243 self.assertEquals(r, map(f, data))
243 self.assertEquals(r, map(f, data))
244
244
245 def test_map_iterable(self):
245 def test_map_iterable(self):
246 """test map on iterables (direct)"""
246 """test map on iterables (direct)"""
247 view = self.client[:]
247 view = self.client[:]
248 # 101 is prime, so it won't be evenly distributed
248 # 101 is prime, so it won't be evenly distributed
249 arr = range(101)
249 arr = range(101)
250 # ensure it will be an iterator, even in Python 3
250 # ensure it will be an iterator, even in Python 3
251 it = iter(arr)
251 it = iter(arr)
252 r = view.map_sync(lambda x:x, arr)
252 r = view.map_sync(lambda x:x, arr)
253 self.assertEquals(r, list(arr))
253 self.assertEquals(r, list(arr))
254
254
255 def test_scatterGatherNonblocking(self):
255 def test_scatterGatherNonblocking(self):
256 data = range(16)
256 data = range(16)
257 view = self.client[:]
257 view = self.client[:]
258 view.scatter('a', data, block=False)
258 view.scatter('a', data, block=False)
259 ar = view.gather('a', block=False)
259 ar = view.gather('a', block=False)
260 self.assertEquals(ar.get(), data)
260 self.assertEquals(ar.get(), data)
261
261
262 @skip_without('numpy')
262 @skip_without('numpy')
263 def test_scatter_gather_numpy_nonblocking(self):
263 def test_scatter_gather_numpy_nonblocking(self):
264 import numpy
264 import numpy
265 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
265 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
266 a = numpy.arange(64)
266 a = numpy.arange(64)
267 view = self.client[:]
267 view = self.client[:]
268 ar = view.scatter('a', a, block=False)
268 ar = view.scatter('a', a, block=False)
269 self.assertTrue(isinstance(ar, AsyncResult))
269 self.assertTrue(isinstance(ar, AsyncResult))
270 amr = view.gather('a', block=False)
270 amr = view.gather('a', block=False)
271 self.assertTrue(isinstance(amr, AsyncMapResult))
271 self.assertTrue(isinstance(amr, AsyncMapResult))
272 assert_array_equal(amr.get(), a)
272 assert_array_equal(amr.get(), a)
273
273
274 def test_execute(self):
274 def test_execute(self):
275 view = self.client[:]
275 view = self.client[:]
276 # self.client.debug=True
276 # self.client.debug=True
277 execute = view.execute
277 execute = view.execute
278 ar = execute('c=30', block=False)
278 ar = execute('c=30', block=False)
279 self.assertTrue(isinstance(ar, AsyncResult))
279 self.assertTrue(isinstance(ar, AsyncResult))
280 ar = execute('d=[0,1,2]', block=False)
280 ar = execute('d=[0,1,2]', block=False)
281 self.client.wait(ar, 1)
281 self.client.wait(ar, 1)
282 self.assertEquals(len(ar.get()), len(self.client))
282 self.assertEquals(len(ar.get()), len(self.client))
283 for c in view['c']:
283 for c in view['c']:
284 self.assertEquals(c, 30)
284 self.assertEquals(c, 30)
285
285
286 def test_abort(self):
286 def test_abort(self):
287 view = self.client[-1]
287 view = self.client[-1]
288 ar = view.execute('import time; time.sleep(1)', block=False)
288 ar = view.execute('import time; time.sleep(1)', block=False)
289 ar2 = view.apply_async(lambda : 2)
289 ar2 = view.apply_async(lambda : 2)
290 ar3 = view.apply_async(lambda : 3)
290 ar3 = view.apply_async(lambda : 3)
291 view.abort(ar2)
291 view.abort(ar2)
292 view.abort(ar3.msg_ids)
292 view.abort(ar3.msg_ids)
293 self.assertRaises(error.TaskAborted, ar2.get)
293 self.assertRaises(error.TaskAborted, ar2.get)
294 self.assertRaises(error.TaskAborted, ar3.get)
294 self.assertRaises(error.TaskAborted, ar3.get)
295
295
296 def test_abort_all(self):
296 def test_abort_all(self):
297 """view.abort() aborts all outstanding tasks"""
297 """view.abort() aborts all outstanding tasks"""
298 view = self.client[-1]
298 view = self.client[-1]
299 ars = [ view.apply_async(time.sleep, 1) for i in range(10) ]
299 ars = [ view.apply_async(time.sleep, 0.25) for i in range(10) ]
300 view.abort()
300 view.abort()
301 view.wait(timeout=5)
301 view.wait(timeout=5)
302 for ar in ars[5:]:
302 for ar in ars[5:]:
303 self.assertRaises(error.TaskAborted, ar.get)
303 self.assertRaises(error.TaskAborted, ar.get)
304
304
305 def test_temp_flags(self):
305 def test_temp_flags(self):
306 view = self.client[-1]
306 view = self.client[-1]
307 view.block=True
307 view.block=True
308 with view.temp_flags(block=False):
308 with view.temp_flags(block=False):
309 self.assertFalse(view.block)
309 self.assertFalse(view.block)
310 self.assertTrue(view.block)
310 self.assertTrue(view.block)
311
311
312 @dec.known_failure_py3
312 @dec.known_failure_py3
313 def test_importer(self):
313 def test_importer(self):
314 view = self.client[-1]
314 view = self.client[-1]
315 view.clear(block=True)
315 view.clear(block=True)
316 with view.importer:
316 with view.importer:
317 import re
317 import re
318
318
319 @interactive
319 @interactive
320 def findall(pat, s):
320 def findall(pat, s):
321 # this globals() step isn't necessary in real code
321 # this globals() step isn't necessary in real code
322 # only to prevent a closure in the test
322 # only to prevent a closure in the test
323 re = globals()['re']
323 re = globals()['re']
324 return re.findall(pat, s)
324 return re.findall(pat, s)
325
325
326 self.assertEquals(view.apply_sync(findall, '\w+', 'hello world'), 'hello world'.split())
326 self.assertEquals(view.apply_sync(findall, '\w+', 'hello world'), 'hello world'.split())
327
327
328 # parallel magic tests
328 # parallel magic tests
329
329
330 def test_magic_px_blocking(self):
330 def test_magic_px_blocking(self):
331 ip = get_ipython()
331 ip = get_ipython()
332 v = self.client[-1]
332 v = self.client[-1]
333 v.activate()
333 v.activate()
334 v.block=True
334 v.block=True
335
335
336 ip.magic_px('a=5')
336 ip.magic_px('a=5')
337 self.assertEquals(v['a'], 5)
337 self.assertEquals(v['a'], 5)
338 ip.magic_px('a=10')
338 ip.magic_px('a=10')
339 self.assertEquals(v['a'], 10)
339 self.assertEquals(v['a'], 10)
340 sio = StringIO()
340 sio = StringIO()
341 savestdout = sys.stdout
341 savestdout = sys.stdout
342 sys.stdout = sio
342 sys.stdout = sio
343 # just 'print a' worst ~99% of the time, but this ensures that
343 # just 'print a' worst ~99% of the time, but this ensures that
344 # the stdout message has arrived when the result is finished:
344 # the stdout message has arrived when the result is finished:
345 ip.magic_px('import sys,time;print (a); sys.stdout.flush();time.sleep(0.2)')
345 ip.magic_px('import sys,time;print (a); sys.stdout.flush();time.sleep(0.2)')
346 sys.stdout = savestdout
346 sys.stdout = savestdout
347 buf = sio.getvalue()
347 buf = sio.getvalue()
348 self.assertTrue('[stdout:' in buf, buf)
348 self.assertTrue('[stdout:' in buf, buf)
349 self.assertTrue(buf.rstrip().endswith('10'))
349 self.assertTrue(buf.rstrip().endswith('10'))
350 self.assertRaisesRemote(ZeroDivisionError, ip.magic_px, '1/0')
350 self.assertRaisesRemote(ZeroDivisionError, ip.magic_px, '1/0')
351
351
352 def test_magic_px_nonblocking(self):
352 def test_magic_px_nonblocking(self):
353 ip = get_ipython()
353 ip = get_ipython()
354 v = self.client[-1]
354 v = self.client[-1]
355 v.activate()
355 v.activate()
356 v.block=False
356 v.block=False
357
357
358 ip.magic_px('a=5')
358 ip.magic_px('a=5')
359 self.assertEquals(v['a'], 5)
359 self.assertEquals(v['a'], 5)
360 ip.magic_px('a=10')
360 ip.magic_px('a=10')
361 self.assertEquals(v['a'], 10)
361 self.assertEquals(v['a'], 10)
362 sio = StringIO()
362 sio = StringIO()
363 savestdout = sys.stdout
363 savestdout = sys.stdout
364 sys.stdout = sio
364 sys.stdout = sio
365 ip.magic_px('print a')
365 ip.magic_px('print a')
366 sys.stdout = savestdout
366 sys.stdout = savestdout
367 buf = sio.getvalue()
367 buf = sio.getvalue()
368 self.assertFalse('[stdout:%i]'%v.targets in buf)
368 self.assertFalse('[stdout:%i]'%v.targets in buf)
369 ip.magic_px('1/0')
369 ip.magic_px('1/0')
370 ar = v.get_result(-1)
370 ar = v.get_result(-1)
371 self.assertRaisesRemote(ZeroDivisionError, ar.get)
371 self.assertRaisesRemote(ZeroDivisionError, ar.get)
372
372
373 def test_magic_autopx_blocking(self):
373 def test_magic_autopx_blocking(self):
374 ip = get_ipython()
374 ip = get_ipython()
375 v = self.client[-1]
375 v = self.client[-1]
376 v.activate()
376 v.activate()
377 v.block=True
377 v.block=True
378
378
379 sio = StringIO()
379 sio = StringIO()
380 savestdout = sys.stdout
380 savestdout = sys.stdout
381 sys.stdout = sio
381 sys.stdout = sio
382 ip.magic_autopx()
382 ip.magic_autopx()
383 ip.run_cell('\n'.join(('a=5','b=10','c=0')))
383 ip.run_cell('\n'.join(('a=5','b=10','c=0')))
384 ip.run_cell('print b')
384 ip.run_cell('print b')
385 ip.run_cell("b/c")
385 ip.run_cell("b/c")
386 ip.run_code(compile('b*=2', '', 'single'))
386 ip.run_code(compile('b*=2', '', 'single'))
387 ip.magic_autopx()
387 ip.magic_autopx()
388 sys.stdout = savestdout
388 sys.stdout = savestdout
389 output = sio.getvalue().strip()
389 output = sio.getvalue().strip()
390 self.assertTrue(output.startswith('%autopx enabled'))
390 self.assertTrue(output.startswith('%autopx enabled'))
391 self.assertTrue(output.endswith('%autopx disabled'))
391 self.assertTrue(output.endswith('%autopx disabled'))
392 self.assertTrue('RemoteError: ZeroDivisionError' in output)
392 self.assertTrue('RemoteError: ZeroDivisionError' in output)
393 ar = v.get_result(-2)
393 ar = v.get_result(-2)
394 self.assertEquals(v['a'], 5)
394 self.assertEquals(v['a'], 5)
395 self.assertEquals(v['b'], 20)
395 self.assertEquals(v['b'], 20)
396 self.assertRaisesRemote(ZeroDivisionError, ar.get)
396 self.assertRaisesRemote(ZeroDivisionError, ar.get)
397
397
398 def test_magic_autopx_nonblocking(self):
398 def test_magic_autopx_nonblocking(self):
399 ip = get_ipython()
399 ip = get_ipython()
400 v = self.client[-1]
400 v = self.client[-1]
401 v.activate()
401 v.activate()
402 v.block=False
402 v.block=False
403
403
404 sio = StringIO()
404 sio = StringIO()
405 savestdout = sys.stdout
405 savestdout = sys.stdout
406 sys.stdout = sio
406 sys.stdout = sio
407 ip.magic_autopx()
407 ip.magic_autopx()
408 ip.run_cell('\n'.join(('a=5','b=10','c=0')))
408 ip.run_cell('\n'.join(('a=5','b=10','c=0')))
409 ip.run_cell('print b')
409 ip.run_cell('print b')
410 ip.run_cell("b/c")
410 ip.run_cell("b/c")
411 ip.run_code(compile('b*=2', '', 'single'))
411 ip.run_code(compile('b*=2', '', 'single'))
412 ip.magic_autopx()
412 ip.magic_autopx()
413 sys.stdout = savestdout
413 sys.stdout = savestdout
414 output = sio.getvalue().strip()
414 output = sio.getvalue().strip()
415 self.assertTrue(output.startswith('%autopx enabled'))
415 self.assertTrue(output.startswith('%autopx enabled'))
416 self.assertTrue(output.endswith('%autopx disabled'))
416 self.assertTrue(output.endswith('%autopx disabled'))
417 self.assertFalse('ZeroDivisionError' in output)
417 self.assertFalse('ZeroDivisionError' in output)
418 ar = v.get_result(-2)
418 ar = v.get_result(-2)
419 self.assertEquals(v['a'], 5)
419 self.assertEquals(v['a'], 5)
420 self.assertEquals(v['b'], 20)
420 self.assertEquals(v['b'], 20)
421 self.assertRaisesRemote(ZeroDivisionError, ar.get)
421 self.assertRaisesRemote(ZeroDivisionError, ar.get)
422
422
423 def test_magic_result(self):
423 def test_magic_result(self):
424 ip = get_ipython()
424 ip = get_ipython()
425 v = self.client[-1]
425 v = self.client[-1]
426 v.activate()
426 v.activate()
427 v['a'] = 111
427 v['a'] = 111
428 ra = v['a']
428 ra = v['a']
429
429
430 ar = ip.magic_result()
430 ar = ip.magic_result()
431 self.assertEquals(ar.msg_ids, [v.history[-1]])
431 self.assertEquals(ar.msg_ids, [v.history[-1]])
432 self.assertEquals(ar.get(), 111)
432 self.assertEquals(ar.get(), 111)
433 ar = ip.magic_result('-2')
433 ar = ip.magic_result('-2')
434 self.assertEquals(ar.msg_ids, [v.history[-2]])
434 self.assertEquals(ar.msg_ids, [v.history[-2]])
435
435
436 def test_unicode_execute(self):
436 def test_unicode_execute(self):
437 """test executing unicode strings"""
437 """test executing unicode strings"""
438 v = self.client[-1]
438 v = self.client[-1]
439 v.block=True
439 v.block=True
440 if sys.version_info[0] >= 3:
440 if sys.version_info[0] >= 3:
441 code="a='é'"
441 code="a='é'"
442 else:
442 else:
443 code=u"a=u'é'"
443 code=u"a=u'é'"
444 v.execute(code)
444 v.execute(code)
445 self.assertEquals(v['a'], u'é')
445 self.assertEquals(v['a'], u'é')
446
446
447 def test_unicode_apply_result(self):
447 def test_unicode_apply_result(self):
448 """test unicode apply results"""
448 """test unicode apply results"""
449 v = self.client[-1]
449 v = self.client[-1]
450 r = v.apply_sync(lambda : u'é')
450 r = v.apply_sync(lambda : u'é')
451 self.assertEquals(r, u'é')
451 self.assertEquals(r, u'é')
452
452
453 def test_unicode_apply_arg(self):
453 def test_unicode_apply_arg(self):
454 """test passing unicode arguments to apply"""
454 """test passing unicode arguments to apply"""
455 v = self.client[-1]
455 v = self.client[-1]
456
456
457 @interactive
457 @interactive
458 def check_unicode(a, check):
458 def check_unicode(a, check):
459 assert isinstance(a, unicode), "%r is not unicode"%a
459 assert isinstance(a, unicode), "%r is not unicode"%a
460 assert isinstance(check, bytes), "%r is not bytes"%check
460 assert isinstance(check, bytes), "%r is not bytes"%check
461 assert a.encode('utf8') == check, "%s != %s"%(a,check)
461 assert a.encode('utf8') == check, "%s != %s"%(a,check)
462
462
463 for s in [ u'é', u'ßø®∫',u'asdf' ]:
463 for s in [ u'é', u'ßø®∫',u'asdf' ]:
464 try:
464 try:
465 v.apply_sync(check_unicode, s, s.encode('utf8'))
465 v.apply_sync(check_unicode, s, s.encode('utf8'))
466 except error.RemoteError as e:
466 except error.RemoteError as e:
467 if e.ename == 'AssertionError':
467 if e.ename == 'AssertionError':
468 self.fail(e.evalue)
468 self.fail(e.evalue)
469 else:
469 else:
470 raise e
470 raise e
471
471
472 def test_map_reference(self):
472 def test_map_reference(self):
473 """view.map(<Reference>, *seqs) should work"""
473 """view.map(<Reference>, *seqs) should work"""
474 v = self.client[:]
474 v = self.client[:]
475 v.scatter('n', self.client.ids, flatten=True)
475 v.scatter('n', self.client.ids, flatten=True)
476 v.execute("f = lambda x,y: x*y")
476 v.execute("f = lambda x,y: x*y")
477 rf = pmod.Reference('f')
477 rf = pmod.Reference('f')
478 nlist = list(range(10))
478 nlist = list(range(10))
479 mlist = nlist[::-1]
479 mlist = nlist[::-1]
480 expected = [ m*n for m,n in zip(mlist, nlist) ]
480 expected = [ m*n for m,n in zip(mlist, nlist) ]
481 result = v.map_sync(rf, mlist, nlist)
481 result = v.map_sync(rf, mlist, nlist)
482 self.assertEquals(result, expected)
482 self.assertEquals(result, expected)
483
483
484 def test_apply_reference(self):
484 def test_apply_reference(self):
485 """view.apply(<Reference>, *args) should work"""
485 """view.apply(<Reference>, *args) should work"""
486 v = self.client[:]
486 v = self.client[:]
487 v.scatter('n', self.client.ids, flatten=True)
487 v.scatter('n', self.client.ids, flatten=True)
488 v.execute("f = lambda x: n*x")
488 v.execute("f = lambda x: n*x")
489 rf = pmod.Reference('f')
489 rf = pmod.Reference('f')
490 result = v.apply_sync(rf, 5)
490 result = v.apply_sync(rf, 5)
491 expected = [ 5*id for id in self.client.ids ]
491 expected = [ 5*id for id in self.client.ids ]
492 self.assertEquals(result, expected)
492 self.assertEquals(result, expected)
493
493
494 def test_eval_reference(self):
494 def test_eval_reference(self):
495 v = self.client[self.client.ids[0]]
495 v = self.client[self.client.ids[0]]
496 v['g'] = range(5)
496 v['g'] = range(5)
497 rg = pmod.Reference('g[0]')
497 rg = pmod.Reference('g[0]')
498 echo = lambda x:x
498 echo = lambda x:x
499 self.assertEquals(v.apply_sync(echo, rg), 0)
499 self.assertEquals(v.apply_sync(echo, rg), 0)
500
500
501 def test_reference_nameerror(self):
501 def test_reference_nameerror(self):
502 v = self.client[self.client.ids[0]]
502 v = self.client[self.client.ids[0]]
503 r = pmod.Reference('elvis_has_left')
503 r = pmod.Reference('elvis_has_left')
504 echo = lambda x:x
504 echo = lambda x:x
505 self.assertRaisesRemote(NameError, v.apply_sync, echo, r)
505 self.assertRaisesRemote(NameError, v.apply_sync, echo, r)
506
506
507
507
General Comments 0
You need to be logged in to leave comments. Login now