##// END OF EJS Templates
.
Garland Zhang -
Show More
@@ -1,371 +1,371 b''
1 """Magic functions for running cells in various scripts."""
1 """Magic functions for running cells in various scripts."""
2
2
3 # Copyright (c) IPython Development Team.
3 # Copyright (c) IPython Development Team.
4 # Distributed under the terms of the Modified BSD License.
4 # Distributed under the terms of the Modified BSD License.
5
5
6 import asyncio
6 import asyncio
7 import asyncio.exceptions
7 import asyncio.exceptions
8 import atexit
8 import atexit
9 import errno
9 import errno
10 import os
10 import os
11 import signal
11 import signal
12 import sys
12 import sys
13 import time
13 import time
14 from subprocess import CalledProcessError
14 from subprocess import CalledProcessError
15 from threading import Thread
15 from threading import Thread
16
16
17 from traitlets import Any, Dict, List, default
17 from traitlets import Any, Dict, List, default
18
18
19 from IPython.core import magic_arguments
19 from IPython.core import magic_arguments
20 from IPython.core.async_helpers import _AsyncIOProxy
20 from IPython.core.async_helpers import _AsyncIOProxy
21 from IPython.core.magic import Magics, cell_magic, line_magic, magics_class
21 from IPython.core.magic import Magics, cell_magic, line_magic, magics_class
22 from IPython.utils.process import arg_split
22 from IPython.utils.process import arg_split
23
23
24 #-----------------------------------------------------------------------------
24 #-----------------------------------------------------------------------------
25 # Magic implementation classes
25 # Magic implementation classes
26 #-----------------------------------------------------------------------------
26 #-----------------------------------------------------------------------------
27
27
28 def script_args(f):
28 def script_args(f):
29 """single decorator for adding script args"""
29 """single decorator for adding script args"""
30 args = [
30 args = [
31 magic_arguments.argument(
31 magic_arguments.argument(
32 '--out', type=str,
32 '--out', type=str,
33 help="""The variable in which to store stdout from the script.
33 help="""The variable in which to store stdout from the script.
34 If the script is backgrounded, this will be the stdout *pipe*,
34 If the script is backgrounded, this will be the stdout *pipe*,
35 instead of the stderr text itself and will not be auto closed.
35 instead of the stderr text itself and will not be auto closed.
36 """
36 """
37 ),
37 ),
38 magic_arguments.argument(
38 magic_arguments.argument(
39 '--err', type=str,
39 '--err', type=str,
40 help="""The variable in which to store stderr from the script.
40 help="""The variable in which to store stderr from the script.
41 If the script is backgrounded, this will be the stderr *pipe*,
41 If the script is backgrounded, this will be the stderr *pipe*,
42 instead of the stderr text itself and will not be autoclosed.
42 instead of the stderr text itself and will not be autoclosed.
43 """
43 """
44 ),
44 ),
45 magic_arguments.argument(
45 magic_arguments.argument(
46 '--bg', action="store_true",
46 '--bg', action="store_true",
47 help="""Whether to run the script in the background.
47 help="""Whether to run the script in the background.
48 If given, the only way to see the output of the command is
48 If given, the only way to see the output of the command is
49 with --out/err.
49 with --out/err.
50 """
50 """
51 ),
51 ),
52 magic_arguments.argument(
52 magic_arguments.argument(
53 '--proc', type=str,
53 '--proc', type=str,
54 help="""The variable in which to store Popen instance.
54 help="""The variable in which to store Popen instance.
55 This is used only when --bg option is given.
55 This is used only when --bg option is given.
56 """
56 """
57 ),
57 ),
58 magic_arguments.argument(
58 magic_arguments.argument(
59 '--no-raise-error', action="store_false", dest='raise_error',
59 '--no-raise-error', action="store_false", dest='raise_error',
60 help="""Whether you should raise an error message in addition to
60 help="""Whether you should raise an error message in addition to
61 a stream on stderr if you get a nonzero exit code.
61 a stream on stderr if you get a nonzero exit code.
62 """,
62 """,
63 ),
63 ),
64 ]
64 ]
65 for arg in args:
65 for arg in args:
66 f = arg(f)
66 f = arg(f)
67 return f
67 return f
68
68
69
69
70 @magics_class
70 @magics_class
71 class ScriptMagics(Magics):
71 class ScriptMagics(Magics):
72 """Magics for talking to scripts
72 """Magics for talking to scripts
73
73
74 This defines a base `%%script` cell magic for running a cell
74 This defines a base `%%script` cell magic for running a cell
75 with a program in a subprocess, and registers a few top-level
75 with a program in a subprocess, and registers a few top-level
76 magics that call %%script with common interpreters.
76 magics that call %%script with common interpreters.
77 """
77 """
78
78
79 event_loop = Any(
79 event_loop = Any(
80 help="""
80 help="""
81 The event loop on which to run subprocesses
81 The event loop on which to run subprocesses
82
82
83 Not the main event loop,
83 Not the main event loop,
84 because we want to be able to make blocking calls
84 because we want to be able to make blocking calls
85 and have certain requirements we don't want to impose on the main loop.
85 and have certain requirements we don't want to impose on the main loop.
86 """
86 """
87 )
87 )
88
88
89 script_magics = List(
89 script_magics = List(
90 help="""Extra script cell magics to define
90 help="""Extra script cell magics to define
91
91
92 This generates simple wrappers of `%%script foo` as `%%foo`.
92 This generates simple wrappers of `%%script foo` as `%%foo`.
93
93
94 If you want to add script magics that aren't on your path,
94 If you want to add script magics that aren't on your path,
95 specify them in script_paths
95 specify them in script_paths
96 """,
96 """,
97 ).tag(config=True)
97 ).tag(config=True)
98 @default('script_magics')
98 @default('script_magics')
99 def _script_magics_default(self):
99 def _script_magics_default(self):
100 """default to a common list of programs"""
100 """default to a common list of programs"""
101
101
102 defaults = [
102 defaults = [
103 'sh',
103 'sh',
104 'bash',
104 'bash',
105 'perl',
105 'perl',
106 'ruby',
106 'ruby',
107 'python',
107 'python',
108 'python2',
108 'python2',
109 'python3',
109 'python3',
110 'pypy',
110 'pypy',
111 ]
111 ]
112 if os.name == 'nt':
112 if os.name == 'nt':
113 defaults.extend([
113 defaults.extend([
114 'cmd',
114 'cmd',
115 ])
115 ])
116
116
117 return defaults
117 return defaults
118
118
119 script_paths = Dict(
119 script_paths = Dict(
120 help="""Dict mapping short 'ruby' names to full paths, such as '/opt/secret/bin/ruby'
120 help="""Dict mapping short 'ruby' names to full paths, such as '/opt/secret/bin/ruby'
121
121
122 Only necessary for items in script_magics where the default path will not
122 Only necessary for items in script_magics where the default path will not
123 find the right interpreter.
123 find the right interpreter.
124 """
124 """
125 ).tag(config=True)
125 ).tag(config=True)
126
126
127 def __init__(self, shell=None):
127 def __init__(self, shell=None):
128 super(ScriptMagics, self).__init__(shell=shell)
128 super(ScriptMagics, self).__init__(shell=shell)
129 self._generate_script_magics()
129 self._generate_script_magics()
130 self.bg_processes = []
130 self.bg_processes = []
131 atexit.register(self.kill_bg_processes)
131 atexit.register(self.kill_bg_processes)
132
132
133 def __del__(self):
133 def __del__(self):
134 self.kill_bg_processes()
134 self.kill_bg_processes()
135
135
136 def _generate_script_magics(self):
136 def _generate_script_magics(self):
137 cell_magics = self.magics['cell']
137 cell_magics = self.magics['cell']
138 for name in self.script_magics:
138 for name in self.script_magics:
139 cell_magics[name] = self._make_script_magic(name)
139 cell_magics[name] = self._make_script_magic(name)
140
140
141 def _make_script_magic(self, name):
141 def _make_script_magic(self, name):
142 """make a named magic, that calls %%script with a particular program"""
142 """make a named magic, that calls %%script with a particular program"""
143 # expand to explicit path if necessary:
143 # expand to explicit path if necessary:
144 script = self.script_paths.get(name, name)
144 script = self.script_paths.get(name, name)
145
145
146 @magic_arguments.magic_arguments()
146 @magic_arguments.magic_arguments()
147 @script_args
147 @script_args
148 def named_script_magic(line, cell):
148 def named_script_magic(line, cell):
149 # if line, add it as cl-flags
149 # if line, add it as cl-flags
150 if line:
150 if line:
151 line = "%s %s" % (script, line)
151 line = "%s %s" % (script, line)
152 else:
152 else:
153 line = script
153 line = script
154 return self.shebang(line, cell)
154 return self.shebang(line, cell)
155
155
156 # write a basic docstring:
156 # write a basic docstring:
157 named_script_magic.__doc__ = \
157 named_script_magic.__doc__ = \
158 """%%{name} script magic
158 """%%{name} script magic
159
159
160 Run cells with {script} in a subprocess.
160 Run cells with {script} in a subprocess.
161
161
162 This is a shortcut for `%%script {script}`
162 This is a shortcut for `%%script {script}`
163 """.format(**locals())
163 """.format(**locals())
164
164
165 return named_script_magic
165 return named_script_magic
166
166
167 @magic_arguments.magic_arguments()
167 @magic_arguments.magic_arguments()
168 @script_args
168 @script_args
169 @cell_magic("script")
169 @cell_magic("script")
170 def shebang(self, line, cell):
170 def shebang(self, line, cell):
171 """Run a cell via a shell command
171 """Run a cell via a shell command
172
172
173 The `%%script` line is like the #! line of script,
173 The `%%script` line is like the #! line of script,
174 specifying a program (bash, perl, ruby, etc.) with which to run.
174 specifying a program (bash, perl, ruby, etc.) with which to run.
175
175
176 The rest of the cell is run by that program.
176 The rest of the cell is run by that program.
177
177
178 Examples
178 Examples
179 --------
179 --------
180 ::
180 ::
181
181
182 In [1]: %%script bash
182 In [1]: %%script bash
183 ...: for i in 1 2 3; do
183 ...: for i in 1 2 3; do
184 ...: echo $i
184 ...: echo $i
185 ...: done
185 ...: done
186 1
186 1
187 2
187 2
188 3
188 3
189 """
189 """
190
190
191 # Create the event loop in which to run script magics
191 # Create the event loop in which to run script magics
192 # this operates on a background thread
192 # this operates on a background thread
193 if self.event_loop is None:
193 if self.event_loop is None:
194 if sys.platform == "win32":
194 if sys.platform == "win32":
195 # don't override the current policy,
195 # don't override the current policy,
196 # just create an event loop
196 # just create an event loop
197 event_loop = asyncio.WindowsProactorEventLoopPolicy().new_event_loop()
197 event_loop = asyncio.WindowsProactorEventLoopPolicy().new_event_loop()
198 else:
198 else:
199 event_loop = asyncio.new_event_loop()
199 event_loop = asyncio.new_event_loop()
200 self.event_loop = event_loop
200 self.event_loop = event_loop
201
201
202 # start the loop in a background thread
202 # start the loop in a background thread
203 asyncio_thread = Thread(target=event_loop.run_forever, daemon=True)
203 asyncio_thread = Thread(target=event_loop.run_forever, daemon=True)
204 asyncio_thread.start()
204 asyncio_thread.start()
205 else:
205 else:
206 event_loop = self.event_loop
206 event_loop = self.event_loop
207
207
208 def in_thread(coro):
208 def in_thread(coro):
209 """Call a coroutine on the asyncio thread"""
209 """Call a coroutine on the asyncio thread"""
210 return asyncio.run_coroutine_threadsafe(coro, event_loop).result()
210 return asyncio.run_coroutine_threadsafe(coro, event_loop).result()
211
211
212 async def _readchunk(stream):
212 async def _readchunk(stream):
213 try:
213 try:
214 return await stream.readuntil(b'\n')
214 return await stream.readuntil(b"\n")
215 except asyncio.exceptions.IncompleteReadError as e:
215 except asyncio.exceptions.IncompleteReadError as e:
216 return e.partial
216 return e.partial
217 except asyncio.exceptions.LimitOverrunError as e:
217 except asyncio.exceptions.LimitOverrunError as e:
218 return await stream.read(e.consumed)
218 return await stream.read(e.consumed)
219
219
220 async def _handle_stream(stream, stream_arg, file_object):
220 async def _handle_stream(stream, stream_arg, file_object):
221 while True:
221 while True:
222 chunk = (await _readchunk(stream)).decode("utf8", errors="replace")
222 chunk = (await _readchunk(stream)).decode("utf8", errors="replace")
223 if not chunk:
223 if not chunk:
224 break
224 break
225 if stream_arg:
225 if stream_arg:
226 self.shell.user_ns[stream_arg] = chunk
226 self.shell.user_ns[stream_arg] = chunk
227 else:
227 else:
228 file_object.write(chunk)
228 file_object.write(chunk)
229 file_object.flush()
229 file_object.flush()
230
230
231 async def _stream_communicate(process, cell):
231 async def _stream_communicate(process, cell):
232 process.stdin.write(cell)
232 process.stdin.write(cell)
233 process.stdin.close()
233 process.stdin.close()
234 stdout_task = asyncio.create_task(
234 stdout_task = asyncio.create_task(
235 _handle_stream(process.stdout, args.out, sys.stdout)
235 _handle_stream(process.stdout, args.out, sys.stdout)
236 )
236 )
237 stderr_task = asyncio.create_task(
237 stderr_task = asyncio.create_task(
238 _handle_stream(process.stderr, args.err, sys.stderr)
238 _handle_stream(process.stderr, args.err, sys.stderr)
239 )
239 )
240 await asyncio.wait([stdout_task, stderr_task])
240 await asyncio.wait([stdout_task, stderr_task])
241 await process.wait()
241 await process.wait()
242
242
243 argv = arg_split(line, posix=not sys.platform.startswith("win"))
243 argv = arg_split(line, posix=not sys.platform.startswith("win"))
244 args, cmd = self.shebang.parser.parse_known_args(argv)
244 args, cmd = self.shebang.parser.parse_known_args(argv)
245
245
246 try:
246 try:
247 p = in_thread(
247 p = in_thread(
248 asyncio.create_subprocess_exec(
248 asyncio.create_subprocess_exec(
249 *cmd,
249 *cmd,
250 stdout=asyncio.subprocess.PIPE,
250 stdout=asyncio.subprocess.PIPE,
251 stderr=asyncio.subprocess.PIPE,
251 stderr=asyncio.subprocess.PIPE,
252 stdin=asyncio.subprocess.PIPE,
252 stdin=asyncio.subprocess.PIPE,
253 )
253 )
254 )
254 )
255 except OSError as e:
255 except OSError as e:
256 if e.errno == errno.ENOENT:
256 if e.errno == errno.ENOENT:
257 print("Couldn't find program: %r" % cmd[0])
257 print("Couldn't find program: %r" % cmd[0])
258 return
258 return
259 else:
259 else:
260 raise
260 raise
261
261
262 if not cell.endswith('\n'):
262 if not cell.endswith('\n'):
263 cell += '\n'
263 cell += '\n'
264 cell = cell.encode('utf8', 'replace')
264 cell = cell.encode('utf8', 'replace')
265 if args.bg:
265 if args.bg:
266 self.bg_processes.append(p)
266 self.bg_processes.append(p)
267 self._gc_bg_processes()
267 self._gc_bg_processes()
268 to_close = []
268 to_close = []
269 if args.out:
269 if args.out:
270 self.shell.user_ns[args.out] = _AsyncIOProxy(p.stdout, event_loop)
270 self.shell.user_ns[args.out] = _AsyncIOProxy(p.stdout, event_loop)
271 else:
271 else:
272 to_close.append(p.stdout)
272 to_close.append(p.stdout)
273 if args.err:
273 if args.err:
274 self.shell.user_ns[args.err] = _AsyncIOProxy(p.stderr, event_loop)
274 self.shell.user_ns[args.err] = _AsyncIOProxy(p.stderr, event_loop)
275 else:
275 else:
276 to_close.append(p.stderr)
276 to_close.append(p.stderr)
277 event_loop.call_soon_threadsafe(
277 event_loop.call_soon_threadsafe(
278 lambda: asyncio.Task(self._run_script(p, cell, to_close))
278 lambda: asyncio.Task(self._run_script(p, cell, to_close))
279 )
279 )
280 if args.proc:
280 if args.proc:
281 proc_proxy = _AsyncIOProxy(p, event_loop)
281 proc_proxy = _AsyncIOProxy(p, event_loop)
282 proc_proxy.stdout = _AsyncIOProxy(p.stdout, event_loop)
282 proc_proxy.stdout = _AsyncIOProxy(p.stdout, event_loop)
283 proc_proxy.stderr = _AsyncIOProxy(p.stderr, event_loop)
283 proc_proxy.stderr = _AsyncIOProxy(p.stderr, event_loop)
284 self.shell.user_ns[args.proc] = proc_proxy
284 self.shell.user_ns[args.proc] = proc_proxy
285 return
285 return
286
286
287 try:
287 try:
288 in_thread(_stream_communicate(p, cell))
288 in_thread(_stream_communicate(p, cell))
289 except KeyboardInterrupt:
289 except KeyboardInterrupt:
290 try:
290 try:
291 p.send_signal(signal.SIGINT)
291 p.send_signal(signal.SIGINT)
292 in_thread(asyncio.wait_for(p.wait(), timeout=0.1))
292 in_thread(asyncio.wait_for(p.wait(), timeout=0.1))
293 if p.returncode is not None:
293 if p.returncode is not None:
294 print("Process is interrupted.")
294 print("Process is interrupted.")
295 return
295 return
296 p.terminate()
296 p.terminate()
297 in_thread(asyncio.wait_for(p.wait(), timeout=0.1))
297 in_thread(asyncio.wait_for(p.wait(), timeout=0.1))
298 if p.returncode is not None:
298 if p.returncode is not None:
299 print("Process is terminated.")
299 print("Process is terminated.")
300 return
300 return
301 p.kill()
301 p.kill()
302 print("Process is killed.")
302 print("Process is killed.")
303 except OSError:
303 except OSError:
304 pass
304 pass
305 except Exception as e:
305 except Exception as e:
306 print("Error while terminating subprocess (pid=%i): %s" % (p.pid, e))
306 print("Error while terminating subprocess (pid=%i): %s" % (p.pid, e))
307 return
307 return
308
308
309 if args.raise_error and p.returncode != 0:
309 if args.raise_error and p.returncode != 0:
310 # If we get here and p.returncode is still None, we must have
310 # If we get here and p.returncode is still None, we must have
311 # killed it but not yet seen its return code. We don't wait for it,
311 # killed it but not yet seen its return code. We don't wait for it,
312 # in case it's stuck in uninterruptible sleep. -9 = SIGKILL
312 # in case it's stuck in uninterruptible sleep. -9 = SIGKILL
313 rc = p.returncode or -9
313 rc = p.returncode or -9
314 raise CalledProcessError(rc, cell)
314 raise CalledProcessError(rc, cell)
315
315
316 shebang.__skip_doctest__ = os.name != "posix"
316 shebang.__skip_doctest__ = os.name != "posix"
317
317
318 async def _run_script(self, p, cell, to_close):
318 async def _run_script(self, p, cell, to_close):
319 """callback for running the script in the background"""
319 """callback for running the script in the background"""
320
320
321 p.stdin.write(cell)
321 p.stdin.write(cell)
322 await p.stdin.drain()
322 await p.stdin.drain()
323 p.stdin.close()
323 p.stdin.close()
324 await p.stdin.wait_closed()
324 await p.stdin.wait_closed()
325 await p.wait()
325 await p.wait()
326 # asyncio read pipes have no close
326 # asyncio read pipes have no close
327 # but we should drain the data anyway
327 # but we should drain the data anyway
328 for s in to_close:
328 for s in to_close:
329 await s.read()
329 await s.read()
330 self._gc_bg_processes()
330 self._gc_bg_processes()
331
331
332 @line_magic("killbgscripts")
332 @line_magic("killbgscripts")
333 def killbgscripts(self, _nouse_=''):
333 def killbgscripts(self, _nouse_=''):
334 """Kill all BG processes started by %%script and its family."""
334 """Kill all BG processes started by %%script and its family."""
335 self.kill_bg_processes()
335 self.kill_bg_processes()
336 print("All background processes were killed.")
336 print("All background processes were killed.")
337
337
338 def kill_bg_processes(self):
338 def kill_bg_processes(self):
339 """Kill all BG processes which are still running."""
339 """Kill all BG processes which are still running."""
340 if not self.bg_processes:
340 if not self.bg_processes:
341 return
341 return
342 for p in self.bg_processes:
342 for p in self.bg_processes:
343 if p.returncode is None:
343 if p.returncode is None:
344 try:
344 try:
345 p.send_signal(signal.SIGINT)
345 p.send_signal(signal.SIGINT)
346 except:
346 except:
347 pass
347 pass
348 time.sleep(0.1)
348 time.sleep(0.1)
349 self._gc_bg_processes()
349 self._gc_bg_processes()
350 if not self.bg_processes:
350 if not self.bg_processes:
351 return
351 return
352 for p in self.bg_processes:
352 for p in self.bg_processes:
353 if p.returncode is None:
353 if p.returncode is None:
354 try:
354 try:
355 p.terminate()
355 p.terminate()
356 except:
356 except:
357 pass
357 pass
358 time.sleep(0.1)
358 time.sleep(0.1)
359 self._gc_bg_processes()
359 self._gc_bg_processes()
360 if not self.bg_processes:
360 if not self.bg_processes:
361 return
361 return
362 for p in self.bg_processes:
362 for p in self.bg_processes:
363 if p.returncode is None:
363 if p.returncode is None:
364 try:
364 try:
365 p.kill()
365 p.kill()
366 except:
366 except:
367 pass
367 pass
368 self._gc_bg_processes()
368 self._gc_bg_processes()
369
369
370 def _gc_bg_processes(self):
370 def _gc_bg_processes(self):
371 self.bg_processes = [p for p in self.bg_processes if p.returncode is None]
371 self.bg_processes = [p for p in self.bg_processes if p.returncode is None]
General Comments 0
You need to be logged in to leave comments. Login now