##// END OF EJS Templates
handle closed event loop in async script magics...
Min RK -
Show More
@@ -1,357 +1,375 b''
1 """Magic functions for running cells in various scripts."""
1 """Magic functions for running cells in various scripts."""
2
2
3 # Copyright (c) IPython Development Team.
3 # Copyright (c) IPython Development Team.
4 # Distributed under the terms of the Modified BSD License.
4 # Distributed under the terms of the Modified BSD License.
5
5
6 import asyncio
6 import asyncio
7 import atexit
7 import atexit
8 import errno
8 import errno
9 import functools
9 import functools
10 import os
10 import os
11 import signal
11 import signal
12 import sys
12 import sys
13 import time
13 import time
14 from contextlib import contextmanager
14 from contextlib import contextmanager
15 from subprocess import CalledProcessError
15 from subprocess import CalledProcessError
16
16
17 from traitlets import Dict, List, default
17 from traitlets import Dict, List, default
18
18
19 from IPython.core import magic_arguments
19 from IPython.core import magic_arguments
20 from IPython.core.magic import Magics, cell_magic, line_magic, magics_class
20 from IPython.core.magic import Magics, cell_magic, line_magic, magics_class
21 from IPython.lib.backgroundjobs import BackgroundJobManager
21 from IPython.lib.backgroundjobs import BackgroundJobManager
22 from IPython.utils.process import arg_split
22 from IPython.utils.process import arg_split
23
23
24 #-----------------------------------------------------------------------------
24 #-----------------------------------------------------------------------------
25 # Magic implementation classes
25 # Magic implementation classes
26 #-----------------------------------------------------------------------------
26 #-----------------------------------------------------------------------------
27
27
28 def script_args(f):
28 def script_args(f):
29 """single decorator for adding script args"""
29 """single decorator for adding script args"""
30 args = [
30 args = [
31 magic_arguments.argument(
31 magic_arguments.argument(
32 '--out', type=str,
32 '--out', type=str,
33 help="""The variable in which to store stdout from the script.
33 help="""The variable in which to store stdout from the script.
34 If the script is backgrounded, this will be the stdout *pipe*,
34 If the script is backgrounded, this will be the stdout *pipe*,
35 instead of the stderr text itself and will not be auto closed.
35 instead of the stderr text itself and will not be auto closed.
36 """
36 """
37 ),
37 ),
38 magic_arguments.argument(
38 magic_arguments.argument(
39 '--err', type=str,
39 '--err', type=str,
40 help="""The variable in which to store stderr from the script.
40 help="""The variable in which to store stderr from the script.
41 If the script is backgrounded, this will be the stderr *pipe*,
41 If the script is backgrounded, this will be the stderr *pipe*,
42 instead of the stderr text itself and will not be autoclosed.
42 instead of the stderr text itself and will not be autoclosed.
43 """
43 """
44 ),
44 ),
45 magic_arguments.argument(
45 magic_arguments.argument(
46 '--bg', action="store_true",
46 '--bg', action="store_true",
47 help="""Whether to run the script in the background.
47 help="""Whether to run the script in the background.
48 If given, the only way to see the output of the command is
48 If given, the only way to see the output of the command is
49 with --out/err.
49 with --out/err.
50 """
50 """
51 ),
51 ),
52 magic_arguments.argument(
52 magic_arguments.argument(
53 '--proc', type=str,
53 '--proc', type=str,
54 help="""The variable in which to store Popen instance.
54 help="""The variable in which to store Popen instance.
55 This is used only when --bg option is given.
55 This is used only when --bg option is given.
56 """
56 """
57 ),
57 ),
58 magic_arguments.argument(
58 magic_arguments.argument(
59 '--no-raise-error', action="store_false", dest='raise_error',
59 '--no-raise-error', action="store_false", dest='raise_error',
60 help="""Whether you should raise an error message in addition to
60 help="""Whether you should raise an error message in addition to
61 a stream on stderr if you get a nonzero exit code.
61 a stream on stderr if you get a nonzero exit code.
62 """
62 """
63 )
63 )
64 ]
64 ]
65 for arg in args:
65 for arg in args:
66 f = arg(f)
66 f = arg(f)
67 return f
67 return f
68
68
69
69
70 @contextmanager
70 @contextmanager
71 def safe_watcher():
71 def safe_watcher():
72 if sys.platform == "win32":
72 if sys.platform == "win32":
73 yield
73 yield
74 return
74 return
75
75
76 from asyncio import SafeChildWatcher
76 from asyncio import SafeChildWatcher
77
77
78 policy = asyncio.get_event_loop_policy()
78 policy = asyncio.get_event_loop_policy()
79 old_watcher = policy.get_child_watcher()
79 old_watcher = policy.get_child_watcher()
80 if isinstance(old_watcher, SafeChildWatcher):
80 if isinstance(old_watcher, SafeChildWatcher):
81 yield
81 yield
82 return
82 return
83
83
84 loop = policy.get_event_loop()
84 try:
85 loop = policy.get_event_loop()
86 if loop.is_closed():
87 raise RuntimeError("open a new one")
88 except RuntimeError:
89 # closed loop, make a new one
90 loop = policy.new_event_loop()
91 policy.set_event_loop(loop)
92
85 try:
93 try:
86 watcher = asyncio.SafeChildWatcher()
94 watcher = asyncio.SafeChildWatcher()
87 watcher.attach_loop(loop)
95 watcher.attach_loop(loop)
88 policy.set_child_watcher(watcher)
96 policy.set_child_watcher(watcher)
89 yield
97 yield
90 finally:
98 finally:
91 watcher.close()
99 watcher.close()
92 policy.set_child_watcher(old_watcher)
100 policy.set_child_watcher(old_watcher)
93
101
94
102
95 def dec_safe_watcher(fun):
103 def dec_safe_watcher(fun):
96 @functools.wraps(fun)
104 @functools.wraps(fun)
97 def _inner(*args, **kwargs):
105 def _inner(*args, **kwargs):
98 with safe_watcher():
106 with safe_watcher():
99 return fun(*args, **kwargs)
107 return fun(*args, **kwargs)
100
108
101 return _inner
109 return _inner
102
110
103
111
104 @magics_class
112 @magics_class
105 class ScriptMagics(Magics):
113 class ScriptMagics(Magics):
106 """Magics for talking to scripts
114 """Magics for talking to scripts
107
115
108 This defines a base `%%script` cell magic for running a cell
116 This defines a base `%%script` cell magic for running a cell
109 with a program in a subprocess, and registers a few top-level
117 with a program in a subprocess, and registers a few top-level
110 magics that call %%script with common interpreters.
118 magics that call %%script with common interpreters.
111 """
119 """
112 script_magics = List(
120 script_magics = List(
113 help="""Extra script cell magics to define
121 help="""Extra script cell magics to define
114
122
115 This generates simple wrappers of `%%script foo` as `%%foo`.
123 This generates simple wrappers of `%%script foo` as `%%foo`.
116
124
117 If you want to add script magics that aren't on your path,
125 If you want to add script magics that aren't on your path,
118 specify them in script_paths
126 specify them in script_paths
119 """,
127 """,
120 ).tag(config=True)
128 ).tag(config=True)
121 @default('script_magics')
129 @default('script_magics')
122 def _script_magics_default(self):
130 def _script_magics_default(self):
123 """default to a common list of programs"""
131 """default to a common list of programs"""
124
132
125 defaults = [
133 defaults = [
126 'sh',
134 'sh',
127 'bash',
135 'bash',
128 'perl',
136 'perl',
129 'ruby',
137 'ruby',
130 'python',
138 'python',
131 'python2',
139 'python2',
132 'python3',
140 'python3',
133 'pypy',
141 'pypy',
134 ]
142 ]
135 if os.name == 'nt':
143 if os.name == 'nt':
136 defaults.extend([
144 defaults.extend([
137 'cmd',
145 'cmd',
138 ])
146 ])
139
147
140 return defaults
148 return defaults
141
149
142 script_paths = Dict(
150 script_paths = Dict(
143 help="""Dict mapping short 'ruby' names to full paths, such as '/opt/secret/bin/ruby'
151 help="""Dict mapping short 'ruby' names to full paths, such as '/opt/secret/bin/ruby'
144
152
145 Only necessary for items in script_magics where the default path will not
153 Only necessary for items in script_magics where the default path will not
146 find the right interpreter.
154 find the right interpreter.
147 """
155 """
148 ).tag(config=True)
156 ).tag(config=True)
149
157
150 def __init__(self, shell=None):
158 def __init__(self, shell=None):
151 super(ScriptMagics, self).__init__(shell=shell)
159 super(ScriptMagics, self).__init__(shell=shell)
152 self._generate_script_magics()
160 self._generate_script_magics()
153 self.job_manager = BackgroundJobManager()
161 self.job_manager = BackgroundJobManager()
154 self.bg_processes = []
162 self.bg_processes = []
155 atexit.register(self.kill_bg_processes)
163 atexit.register(self.kill_bg_processes)
156
164
157 def __del__(self):
165 def __del__(self):
158 self.kill_bg_processes()
166 self.kill_bg_processes()
159
167
160 def _generate_script_magics(self):
168 def _generate_script_magics(self):
161 cell_magics = self.magics['cell']
169 cell_magics = self.magics['cell']
162 for name in self.script_magics:
170 for name in self.script_magics:
163 cell_magics[name] = self._make_script_magic(name)
171 cell_magics[name] = self._make_script_magic(name)
164
172
165 def _make_script_magic(self, name):
173 def _make_script_magic(self, name):
166 """make a named magic, that calls %%script with a particular program"""
174 """make a named magic, that calls %%script with a particular program"""
167 # expand to explicit path if necessary:
175 # expand to explicit path if necessary:
168 script = self.script_paths.get(name, name)
176 script = self.script_paths.get(name, name)
169
177
170 @magic_arguments.magic_arguments()
178 @magic_arguments.magic_arguments()
171 @script_args
179 @script_args
172 def named_script_magic(line, cell):
180 def named_script_magic(line, cell):
173 # if line, add it as cl-flags
181 # if line, add it as cl-flags
174 if line:
182 if line:
175 line = "%s %s" % (script, line)
183 line = "%s %s" % (script, line)
176 else:
184 else:
177 line = script
185 line = script
178 return self.shebang(line, cell)
186 return self.shebang(line, cell)
179
187
180 # write a basic docstring:
188 # write a basic docstring:
181 named_script_magic.__doc__ = \
189 named_script_magic.__doc__ = \
182 """%%{name} script magic
190 """%%{name} script magic
183
191
184 Run cells with {script} in a subprocess.
192 Run cells with {script} in a subprocess.
185
193
186 This is a shortcut for `%%script {script}`
194 This is a shortcut for `%%script {script}`
187 """.format(**locals())
195 """.format(**locals())
188
196
189 return named_script_magic
197 return named_script_magic
190
198
191 @magic_arguments.magic_arguments()
199 @magic_arguments.magic_arguments()
192 @script_args
200 @script_args
193 @cell_magic("script")
201 @cell_magic("script")
194 @dec_safe_watcher
202 @dec_safe_watcher
195 def shebang(self, line, cell):
203 def shebang(self, line, cell):
196 """Run a cell via a shell command
204 """Run a cell via a shell command
197
205
198 The `%%script` line is like the #! line of script,
206 The `%%script` line is like the #! line of script,
199 specifying a program (bash, perl, ruby, etc.) with which to run.
207 specifying a program (bash, perl, ruby, etc.) with which to run.
200
208
201 The rest of the cell is run by that program.
209 The rest of the cell is run by that program.
202
210
203 Examples
211 Examples
204 --------
212 --------
205 ::
213 ::
206
214
207 In [1]: %%script bash
215 In [1]: %%script bash
208 ...: for i in 1 2 3; do
216 ...: for i in 1 2 3; do
209 ...: echo $i
217 ...: echo $i
210 ...: done
218 ...: done
211 1
219 1
212 2
220 2
213 3
221 3
214 """
222 """
215
223
216 async def _handle_stream(stream, stream_arg, file_object):
224 async def _handle_stream(stream, stream_arg, file_object):
217 while True:
225 while True:
218 line = (await stream.readline()).decode("utf8")
226 line = (await stream.readline()).decode("utf8")
219 if not line:
227 if not line:
220 break
228 break
221 if stream_arg:
229 if stream_arg:
222 self.shell.user_ns[stream_arg] = line
230 self.shell.user_ns[stream_arg] = line
223 else:
231 else:
224 file_object.write(line)
232 file_object.write(line)
225 file_object.flush()
233 file_object.flush()
226
234
227 async def _stream_communicate(process, cell):
235 async def _stream_communicate(process, cell):
228 process.stdin.write(cell)
236 process.stdin.write(cell)
229 process.stdin.close()
237 process.stdin.close()
230 stdout_task = asyncio.create_task(
238 stdout_task = asyncio.create_task(
231 _handle_stream(process.stdout, args.out, sys.stdout)
239 _handle_stream(process.stdout, args.out, sys.stdout)
232 )
240 )
233 stderr_task = asyncio.create_task(
241 stderr_task = asyncio.create_task(
234 _handle_stream(process.stderr, args.err, sys.stderr)
242 _handle_stream(process.stderr, args.err, sys.stderr)
235 )
243 )
236 await asyncio.wait([stdout_task, stderr_task])
244 await asyncio.wait([stdout_task, stderr_task])
237 await process.wait()
245 await process.wait()
238
246
239 if sys.platform.startswith("win"):
247 policy = asyncio.get_event_loop_policy()
240 asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy())
248 if sys.platform.startswith("win") and not isinstance(
241 loop = asyncio.get_event_loop_policy().get_event_loop()
249 policy, asyncio.WindowsProactorEventLoopPolicy
250 ):
251 # _do not_ overwrite the current policy
252 policy = asyncio.WindowsProactorEventLoopPolicy()
253
254 try:
255 loop = policy.get_event_loop()
256 except RuntimeError:
257 # closed loop, make a new one
258 loop = policy.new_event_loop()
259 policy.set_event_loop(loop)
242 argv = arg_split(line, posix=not sys.platform.startswith("win"))
260 argv = arg_split(line, posix=not sys.platform.startswith("win"))
243 args, cmd = self.shebang.parser.parse_known_args(argv)
261 args, cmd = self.shebang.parser.parse_known_args(argv)
244 try:
262 try:
245 p = loop.run_until_complete(
263 p = loop.run_until_complete(
246 asyncio.create_subprocess_exec(
264 asyncio.create_subprocess_exec(
247 *cmd,
265 *cmd,
248 stdout=asyncio.subprocess.PIPE,
266 stdout=asyncio.subprocess.PIPE,
249 stderr=asyncio.subprocess.PIPE,
267 stderr=asyncio.subprocess.PIPE,
250 stdin=asyncio.subprocess.PIPE,
268 stdin=asyncio.subprocess.PIPE,
251 )
269 )
252 )
270 )
253 except OSError as e:
271 except OSError as e:
254 if e.errno == errno.ENOENT:
272 if e.errno == errno.ENOENT:
255 print("Couldn't find program: %r" % cmd[0])
273 print("Couldn't find program: %r" % cmd[0])
256 return
274 return
257 else:
275 else:
258 raise
276 raise
259
277
260 if not cell.endswith('\n'):
278 if not cell.endswith('\n'):
261 cell += '\n'
279 cell += '\n'
262 cell = cell.encode('utf8', 'replace')
280 cell = cell.encode('utf8', 'replace')
263 if args.bg:
281 if args.bg:
264 self.bg_processes.append(p)
282 self.bg_processes.append(p)
265 self._gc_bg_processes()
283 self._gc_bg_processes()
266 to_close = []
284 to_close = []
267 if args.out:
285 if args.out:
268 self.shell.user_ns[args.out] = p.stdout
286 self.shell.user_ns[args.out] = p.stdout
269 else:
287 else:
270 to_close.append(p.stdout)
288 to_close.append(p.stdout)
271 if args.err:
289 if args.err:
272 self.shell.user_ns[args.err] = p.stderr
290 self.shell.user_ns[args.err] = p.stderr
273 else:
291 else:
274 to_close.append(p.stderr)
292 to_close.append(p.stderr)
275 self.job_manager.new(self._run_script, p, cell, to_close, daemon=True)
293 self.job_manager.new(self._run_script, p, cell, to_close, daemon=True)
276 if args.proc:
294 if args.proc:
277 self.shell.user_ns[args.proc] = p
295 self.shell.user_ns[args.proc] = p
278 return
296 return
279
297
280 try:
298 try:
281 loop.run_until_complete(_stream_communicate(p, cell))
299 loop.run_until_complete(_stream_communicate(p, cell))
282 except KeyboardInterrupt:
300 except KeyboardInterrupt:
283 try:
301 try:
284 p.send_signal(signal.SIGINT)
302 p.send_signal(signal.SIGINT)
285 time.sleep(0.1)
303 time.sleep(0.1)
286 if p.returncode is not None:
304 if p.returncode is not None:
287 print("Process is interrupted.")
305 print("Process is interrupted.")
288 return
306 return
289 p.terminate()
307 p.terminate()
290 time.sleep(0.1)
308 time.sleep(0.1)
291 if p.returncode is not None:
309 if p.returncode is not None:
292 print("Process is terminated.")
310 print("Process is terminated.")
293 return
311 return
294 p.kill()
312 p.kill()
295 print("Process is killed.")
313 print("Process is killed.")
296 except OSError:
314 except OSError:
297 pass
315 pass
298 except Exception as e:
316 except Exception as e:
299 print("Error while terminating subprocess (pid=%i): %s" % (p.pid, e))
317 print("Error while terminating subprocess (pid=%i): %s" % (p.pid, e))
300 return
318 return
301 if args.raise_error and p.returncode!=0:
319 if args.raise_error and p.returncode!=0:
302 # If we get here and p.returncode is still None, we must have
320 # If we get here and p.returncode is still None, we must have
303 # killed it but not yet seen its return code. We don't wait for it,
321 # killed it but not yet seen its return code. We don't wait for it,
304 # in case it's stuck in uninterruptible sleep. -9 = SIGKILL
322 # in case it's stuck in uninterruptible sleep. -9 = SIGKILL
305 rc = p.returncode or -9
323 rc = p.returncode or -9
306 raise CalledProcessError(rc, cell)
324 raise CalledProcessError(rc, cell)
307
325
308 shebang.__skip_doctest__ = os.name != "posix"
326 shebang.__skip_doctest__ = os.name != "posix"
309
327
310 def _run_script(self, p, cell, to_close):
328 def _run_script(self, p, cell, to_close):
311 """callback for running the script in the background"""
329 """callback for running the script in the background"""
312 p.stdin.write(cell)
330 p.stdin.write(cell)
313 p.stdin.close()
331 p.stdin.close()
314 for s in to_close:
332 for s in to_close:
315 s.close()
333 s.close()
316 p.wait()
334 p.wait()
317
335
318 @line_magic("killbgscripts")
336 @line_magic("killbgscripts")
319 def killbgscripts(self, _nouse_=''):
337 def killbgscripts(self, _nouse_=''):
320 """Kill all BG processes started by %%script and its family."""
338 """Kill all BG processes started by %%script and its family."""
321 self.kill_bg_processes()
339 self.kill_bg_processes()
322 print("All background processes were killed.")
340 print("All background processes were killed.")
323
341
324 def kill_bg_processes(self):
342 def kill_bg_processes(self):
325 """Kill all BG processes which are still running."""
343 """Kill all BG processes which are still running."""
326 if not self.bg_processes:
344 if not self.bg_processes:
327 return
345 return
328 for p in self.bg_processes:
346 for p in self.bg_processes:
329 if p.returncode is None:
347 if p.returncode is None:
330 try:
348 try:
331 p.send_signal(signal.SIGINT)
349 p.send_signal(signal.SIGINT)
332 except:
350 except:
333 pass
351 pass
334 time.sleep(0.1)
352 time.sleep(0.1)
335 self._gc_bg_processes()
353 self._gc_bg_processes()
336 if not self.bg_processes:
354 if not self.bg_processes:
337 return
355 return
338 for p in self.bg_processes:
356 for p in self.bg_processes:
339 if p.returncode is None:
357 if p.returncode is None:
340 try:
358 try:
341 p.terminate()
359 p.terminate()
342 except:
360 except:
343 pass
361 pass
344 time.sleep(0.1)
362 time.sleep(0.1)
345 self._gc_bg_processes()
363 self._gc_bg_processes()
346 if not self.bg_processes:
364 if not self.bg_processes:
347 return
365 return
348 for p in self.bg_processes:
366 for p in self.bg_processes:
349 if p.returncode is None:
367 if p.returncode is None:
350 try:
368 try:
351 p.kill()
369 p.kill()
352 except:
370 except:
353 pass
371 pass
354 self._gc_bg_processes()
372 self._gc_bg_processes()
355
373
356 def _gc_bg_processes(self):
374 def _gc_bg_processes(self):
357 self.bg_processes = [p for p in self.bg_processes if p.returncode is None]
375 self.bg_processes = [p for p in self.bg_processes if p.returncode is None]
General Comments 0
You need to be logged in to leave comments. Login now