##// END OF EJS Templates
workaround Windows lack of fork in subprocess tests...
MinRK -
Show More
@@ -1,203 +1,198 b''
1 1 """test the IPython Kernel"""
2 2
3 3 #-------------------------------------------------------------------------------
4 4 # Copyright (C) 2013 The IPython Development Team
5 5 #
6 6 # Distributed under the terms of the BSD License. The full license is in
7 7 # the file COPYING, distributed as part of this software.
8 8 #-------------------------------------------------------------------------------
9 9
10 10 #-------------------------------------------------------------------------------
11 11 # Imports
12 12 #-------------------------------------------------------------------------------
13 13
14 14 import os
15 15 import shutil
16 16 import tempfile
17 17
18 18 from Queue import Empty
19 19 from contextlib import contextmanager
20 20 from subprocess import PIPE
21 21
22 22 import nose.tools as nt
23 23
24 24 from IPython.zmq.blockingkernelmanager import BlockingKernelManager
25 25 from IPython.zmq.tests.test_message_spec import execute, flush_channels
26 26 from IPython.testing import decorators as dec
27 27 from IPython.utils import path, py3compat
28 28
29 29 #-------------------------------------------------------------------------------
30 30 # Tests
31 31 #-------------------------------------------------------------------------------
32 32
33 33 def setup():
34 34 """setup temporary IPYTHONDIR for tests"""
35 35 global IPYTHONDIR
36 36 global save_env
37 37 global save_get_ipython_dir
38 38
39 39 IPYTHONDIR = tempfile.mkdtemp()
40 40
41 41 save_env = os.environ.copy()
42 42 os.environ["IPYTHONDIR"] = IPYTHONDIR
43 43
44 44 save_get_ipython_dir = path.get_ipython_dir
45 45 path.get_ipython_dir = lambda : IPYTHONDIR
46 46
47 47
48 48 def teardown():
49 49 path.get_ipython_dir = save_get_ipython_dir
50 50 os.environ = save_env
51 51
52 52 try:
53 53 shutil.rmtree(IPYTHONDIR)
54 54 except (OSError, IOError):
55 55 # no such file
56 56 pass
57 57
58 58
59 59 @contextmanager
60 60 def new_kernel():
61 61 """start a kernel in a subprocess, and wait for it to be ready
62 62
63 63 Returns
64 64 -------
65 65 kernel_manager: connected KernelManager instance
66 66 """
67 67 KM = BlockingKernelManager()
68 68
69 69 KM.start_kernel(stdout=PIPE, stderr=PIPE)
70 70 KM.start_channels()
71 71
72 72 # wait for kernel to be ready
73 73 KM.shell_channel.execute("import sys")
74 74 KM.shell_channel.get_msg(block=True, timeout=5)
75 75 flush_channels(KM)
76 76 try:
77 77 yield KM
78 78 finally:
79 79 KM.stop_channels()
80 80 KM.shutdown_kernel()
81 81
82 82
83 83 def assemble_output(iopub):
84 84 """assemble stdout/err from an execution"""
85 85 stdout = ''
86 86 stderr = ''
87 87 while True:
88 88 msg = iopub.get_msg(block=True, timeout=1)
89 89 msg_type = msg['msg_type']
90 90 content = msg['content']
91 91 if msg_type == 'status' and content['execution_state'] == 'idle':
92 92 # idle message signals end of output
93 93 break
94 94 elif msg['msg_type'] == 'stream':
95 95 if content['name'] == 'stdout':
96 96 stdout = stdout + content['data']
97 97 elif content['name'] == 'stderr':
98 98 stderr = stderr + content['data']
99 99 else:
100 100 raise KeyError("bad stream: %r" % content['name'])
101 101 else:
102 102 # other output, ignored
103 103 pass
104 104 return stdout, stderr
105 105
106 106
107 107 def _check_mp_mode(km, expected=False, stream="stdout"):
108 108 execute(km=km, code="import sys")
109 109 flush_channels(km)
110 110 msg_id, content = execute(km=km, code="print (sys.%s._check_mp_mode())" % stream)
111 111 stdout, stderr = assemble_output(km.iopub_channel)
112 112 nt.assert_equal(eval(stdout.strip()), expected)
113 113
114 114
115 115 def test_simple_print():
116 116 """simple print statement in kernel"""
117 117 with new_kernel() as km:
118 118 iopub = km.iopub_channel
119 119 msg_id, content = execute(km=km, code="print ('hi')")
120 120 stdout, stderr = assemble_output(iopub)
121 121 nt.assert_equal(stdout, 'hi\n')
122 122 nt.assert_equal(stderr, '')
123 123 _check_mp_mode(km, expected=False)
124 124 print ('hello')
125 125
126 126
127 127 def test_subprocess_print():
128 128 """printing from forked mp.Process"""
129 129 with new_kernel() as km:
130 130 iopub = km.iopub_channel
131 131
132 132 _check_mp_mode(km, expected=False)
133 133 flush_channels(km)
134 134 np = 5
135 135 code = '\n'.join([
136 "from __future__ import print_function",
136 137 "import multiprocessing as mp",
137 "def f(x):",
138 " print('hello',x)",
139 "pool = [mp.Process(target=f,args=(i,)) for i in range(%i)]" % np,
138 "pool = [mp.Process(target=print, args=('hello', i,)) for i in range(%i)]" % np,
140 139 "for p in pool: p.start()",
141 140 "for p in pool: p.join()"
142 141 ])
143 142
144 143 expected = '\n'.join([
145 144 "hello %s" % i for i in range(np)
146 145 ]) + '\n'
147 146
148 147 msg_id, content = execute(km=km, code=code)
149 148 stdout, stderr = assemble_output(iopub)
150 149 nt.assert_equal(stdout.count("hello"), np, stdout)
151 150 for n in range(np):
152 151 nt.assert_equal(stdout.count(str(n)), 1, stdout)
153 152 nt.assert_equal(stderr, '')
154 153 _check_mp_mode(km, expected=False)
155 154 _check_mp_mode(km, expected=False, stream="stderr")
156 155
157 156
158 157 def test_subprocess_noprint():
159 158 """mp.Process without print doesn't trigger iostream mp_mode"""
160 159 with new_kernel() as km:
161 160 iopub = km.iopub_channel
162 161
163 162 np = 5
164 163 code = '\n'.join([
165 164 "import multiprocessing as mp",
166 "def f(x):",
167 " return x",
168 "pool = [mp.Process(target=f,args=(i,)) for i in range(%i)]" % np,
165 "pool = [mp.Process(target=range,args=(i,)) for i in range(%i)]" % np,
169 166 "for p in pool: p.start()",
170 167 "for p in pool: p.join()"
171 168 ])
172 169
173 170 msg_id, content = execute(km=km, code=code)
174 171 stdout, stderr = assemble_output(iopub)
175 172 nt.assert_equal(stdout, '')
176 173 nt.assert_equal(stderr, '')
177 174
178 175 _check_mp_mode(km, expected=False)
179 176 _check_mp_mode(km, expected=False, stream="stderr")
180 177
181 178
182 179 def test_subprocess_error():
183 180 """error in mp.Process doesn't crash"""
184 181 with new_kernel() as km:
185 182 iopub = km.iopub_channel
186 183
187 184 code = '\n'.join([
188 185 "import multiprocessing as mp",
189 "def f():",
190 " return 1/0",
191 "p = mp.Process(target=f)",
186 "p = mp.Process(target=int, args=('hi',))",
192 187 "p.start()",
193 188 "p.join()",
194 189 ])
195 190
196 191 msg_id, content = execute(km=km, code=code)
197 192 stdout, stderr = assemble_output(iopub)
198 193 nt.assert_equal(stdout, '')
199 nt.assert_true("ZeroDivisionError" in stderr, stderr)
194 nt.assert_true("ValueError" in stderr, stderr)
200 195
201 196 _check_mp_mode(km, expected=False)
202 197 _check_mp_mode(km, expected=False, stream="stderr")
203 198
General Comments 0
You need to be logged in to leave comments. Login now