##// END OF EJS Templates
Read each from stream until separator found or buffer is full
Garland Zhang -
Show More
@@ -1,362 +1,371 b''
1 """Magic functions for running cells in various scripts."""
1 """Magic functions for running cells in various scripts."""
2
2
3 # Copyright (c) IPython Development Team.
3 # Copyright (c) IPython Development Team.
4 # Distributed under the terms of the Modified BSD License.
4 # Distributed under the terms of the Modified BSD License.
5
5
6 import asyncio
6 import asyncio
7 import asyncio.exceptions
7 import atexit
8 import atexit
8 import errno
9 import errno
9 import os
10 import os
10 import signal
11 import signal
11 import sys
12 import sys
12 import time
13 import time
13 from subprocess import CalledProcessError
14 from subprocess import CalledProcessError
14 from threading import Thread
15 from threading import Thread
15
16
16 from traitlets import Any, Dict, List, default
17 from traitlets import Any, Dict, List, default
17
18
18 from IPython.core import magic_arguments
19 from IPython.core import magic_arguments
19 from IPython.core.async_helpers import _AsyncIOProxy
20 from IPython.core.async_helpers import _AsyncIOProxy
20 from IPython.core.magic import Magics, cell_magic, line_magic, magics_class
21 from IPython.core.magic import Magics, cell_magic, line_magic, magics_class
21 from IPython.utils.process import arg_split
22 from IPython.utils.process import arg_split
22
23
23 #-----------------------------------------------------------------------------
24 #-----------------------------------------------------------------------------
24 # Magic implementation classes
25 # Magic implementation classes
25 #-----------------------------------------------------------------------------
26 #-----------------------------------------------------------------------------
26
27
27 def script_args(f):
28 def script_args(f):
28 """single decorator for adding script args"""
29 """single decorator for adding script args"""
29 args = [
30 args = [
30 magic_arguments.argument(
31 magic_arguments.argument(
31 '--out', type=str,
32 '--out', type=str,
32 help="""The variable in which to store stdout from the script.
33 help="""The variable in which to store stdout from the script.
33 If the script is backgrounded, this will be the stdout *pipe*,
34 If the script is backgrounded, this will be the stdout *pipe*,
34 instead of the stderr text itself and will not be auto closed.
35 instead of the stderr text itself and will not be auto closed.
35 """
36 """
36 ),
37 ),
37 magic_arguments.argument(
38 magic_arguments.argument(
38 '--err', type=str,
39 '--err', type=str,
39 help="""The variable in which to store stderr from the script.
40 help="""The variable in which to store stderr from the script.
40 If the script is backgrounded, this will be the stderr *pipe*,
41 If the script is backgrounded, this will be the stderr *pipe*,
41 instead of the stderr text itself and will not be autoclosed.
42 instead of the stderr text itself and will not be autoclosed.
42 """
43 """
43 ),
44 ),
44 magic_arguments.argument(
45 magic_arguments.argument(
45 '--bg', action="store_true",
46 '--bg', action="store_true",
46 help="""Whether to run the script in the background.
47 help="""Whether to run the script in the background.
47 If given, the only way to see the output of the command is
48 If given, the only way to see the output of the command is
48 with --out/err.
49 with --out/err.
49 """
50 """
50 ),
51 ),
51 magic_arguments.argument(
52 magic_arguments.argument(
52 '--proc', type=str,
53 '--proc', type=str,
53 help="""The variable in which to store Popen instance.
54 help="""The variable in which to store Popen instance.
54 This is used only when --bg option is given.
55 This is used only when --bg option is given.
55 """
56 """
56 ),
57 ),
57 magic_arguments.argument(
58 magic_arguments.argument(
58 '--no-raise-error', action="store_false", dest='raise_error',
59 '--no-raise-error', action="store_false", dest='raise_error',
59 help="""Whether you should raise an error message in addition to
60 help="""Whether you should raise an error message in addition to
60 a stream on stderr if you get a nonzero exit code.
61 a stream on stderr if you get a nonzero exit code.
61 """,
62 """,
62 ),
63 ),
63 ]
64 ]
64 for arg in args:
65 for arg in args:
65 f = arg(f)
66 f = arg(f)
66 return f
67 return f
67
68
68
69
69 @magics_class
70 @magics_class
70 class ScriptMagics(Magics):
71 class ScriptMagics(Magics):
71 """Magics for talking to scripts
72 """Magics for talking to scripts
72
73
73 This defines a base `%%script` cell magic for running a cell
74 This defines a base `%%script` cell magic for running a cell
74 with a program in a subprocess, and registers a few top-level
75 with a program in a subprocess, and registers a few top-level
75 magics that call %%script with common interpreters.
76 magics that call %%script with common interpreters.
76 """
77 """
77
78
78 event_loop = Any(
79 event_loop = Any(
79 help="""
80 help="""
80 The event loop on which to run subprocesses
81 The event loop on which to run subprocesses
81
82
82 Not the main event loop,
83 Not the main event loop,
83 because we want to be able to make blocking calls
84 because we want to be able to make blocking calls
84 and have certain requirements we don't want to impose on the main loop.
85 and have certain requirements we don't want to impose on the main loop.
85 """
86 """
86 )
87 )
87
88
88 script_magics = List(
89 script_magics = List(
89 help="""Extra script cell magics to define
90 help="""Extra script cell magics to define
90
91
91 This generates simple wrappers of `%%script foo` as `%%foo`.
92 This generates simple wrappers of `%%script foo` as `%%foo`.
92
93
93 If you want to add script magics that aren't on your path,
94 If you want to add script magics that aren't on your path,
94 specify them in script_paths
95 specify them in script_paths
95 """,
96 """,
96 ).tag(config=True)
97 ).tag(config=True)
97 @default('script_magics')
98 @default('script_magics')
98 def _script_magics_default(self):
99 def _script_magics_default(self):
99 """default to a common list of programs"""
100 """default to a common list of programs"""
100
101
101 defaults = [
102 defaults = [
102 'sh',
103 'sh',
103 'bash',
104 'bash',
104 'perl',
105 'perl',
105 'ruby',
106 'ruby',
106 'python',
107 'python',
107 'python2',
108 'python2',
108 'python3',
109 'python3',
109 'pypy',
110 'pypy',
110 ]
111 ]
111 if os.name == 'nt':
112 if os.name == 'nt':
112 defaults.extend([
113 defaults.extend([
113 'cmd',
114 'cmd',
114 ])
115 ])
115
116
116 return defaults
117 return defaults
117
118
118 script_paths = Dict(
119 script_paths = Dict(
119 help="""Dict mapping short 'ruby' names to full paths, such as '/opt/secret/bin/ruby'
120 help="""Dict mapping short 'ruby' names to full paths, such as '/opt/secret/bin/ruby'
120
121
121 Only necessary for items in script_magics where the default path will not
122 Only necessary for items in script_magics where the default path will not
122 find the right interpreter.
123 find the right interpreter.
123 """
124 """
124 ).tag(config=True)
125 ).tag(config=True)
125
126
126 def __init__(self, shell=None):
127 def __init__(self, shell=None):
127 super(ScriptMagics, self).__init__(shell=shell)
128 super(ScriptMagics, self).__init__(shell=shell)
128 self._generate_script_magics()
129 self._generate_script_magics()
129 self.bg_processes = []
130 self.bg_processes = []
130 atexit.register(self.kill_bg_processes)
131 atexit.register(self.kill_bg_processes)
131
132
132 def __del__(self):
133 def __del__(self):
133 self.kill_bg_processes()
134 self.kill_bg_processes()
134
135
135 def _generate_script_magics(self):
136 def _generate_script_magics(self):
136 cell_magics = self.magics['cell']
137 cell_magics = self.magics['cell']
137 for name in self.script_magics:
138 for name in self.script_magics:
138 cell_magics[name] = self._make_script_magic(name)
139 cell_magics[name] = self._make_script_magic(name)
139
140
140 def _make_script_magic(self, name):
141 def _make_script_magic(self, name):
141 """make a named magic, that calls %%script with a particular program"""
142 """make a named magic, that calls %%script with a particular program"""
142 # expand to explicit path if necessary:
143 # expand to explicit path if necessary:
143 script = self.script_paths.get(name, name)
144 script = self.script_paths.get(name, name)
144
145
145 @magic_arguments.magic_arguments()
146 @magic_arguments.magic_arguments()
146 @script_args
147 @script_args
147 def named_script_magic(line, cell):
148 def named_script_magic(line, cell):
148 # if line, add it as cl-flags
149 # if line, add it as cl-flags
149 if line:
150 if line:
150 line = "%s %s" % (script, line)
151 line = "%s %s" % (script, line)
151 else:
152 else:
152 line = script
153 line = script
153 return self.shebang(line, cell)
154 return self.shebang(line, cell)
154
155
155 # write a basic docstring:
156 # write a basic docstring:
156 named_script_magic.__doc__ = \
157 named_script_magic.__doc__ = \
157 """%%{name} script magic
158 """%%{name} script magic
158
159
159 Run cells with {script} in a subprocess.
160 Run cells with {script} in a subprocess.
160
161
161 This is a shortcut for `%%script {script}`
162 This is a shortcut for `%%script {script}`
162 """.format(**locals())
163 """.format(**locals())
163
164
164 return named_script_magic
165 return named_script_magic
165
166
166 @magic_arguments.magic_arguments()
167 @magic_arguments.magic_arguments()
167 @script_args
168 @script_args
168 @cell_magic("script")
169 @cell_magic("script")
169 def shebang(self, line, cell):
170 def shebang(self, line, cell):
170 """Run a cell via a shell command
171 """Run a cell via a shell command
171
172
172 The `%%script` line is like the #! line of script,
173 The `%%script` line is like the #! line of script,
173 specifying a program (bash, perl, ruby, etc.) with which to run.
174 specifying a program (bash, perl, ruby, etc.) with which to run.
174
175
175 The rest of the cell is run by that program.
176 The rest of the cell is run by that program.
176
177
177 Examples
178 Examples
178 --------
179 --------
179 ::
180 ::
180
181
181 In [1]: %%script bash
182 In [1]: %%script bash
182 ...: for i in 1 2 3; do
183 ...: for i in 1 2 3; do
183 ...: echo $i
184 ...: echo $i
184 ...: done
185 ...: done
185 1
186 1
186 2
187 2
187 3
188 3
188 """
189 """
189
190
190 # Create the event loop in which to run script magics
191 # Create the event loop in which to run script magics
191 # this operates on a background thread
192 # this operates on a background thread
192 if self.event_loop is None:
193 if self.event_loop is None:
193 if sys.platform == "win32":
194 if sys.platform == "win32":
194 # don't override the current policy,
195 # don't override the current policy,
195 # just create an event loop
196 # just create an event loop
196 event_loop = asyncio.WindowsProactorEventLoopPolicy().new_event_loop()
197 event_loop = asyncio.WindowsProactorEventLoopPolicy().new_event_loop()
197 else:
198 else:
198 event_loop = asyncio.new_event_loop()
199 event_loop = asyncio.new_event_loop()
199 self.event_loop = event_loop
200 self.event_loop = event_loop
200
201
201 # start the loop in a background thread
202 # start the loop in a background thread
202 asyncio_thread = Thread(target=event_loop.run_forever, daemon=True)
203 asyncio_thread = Thread(target=event_loop.run_forever, daemon=True)
203 asyncio_thread.start()
204 asyncio_thread.start()
204 else:
205 else:
205 event_loop = self.event_loop
206 event_loop = self.event_loop
206
207
207 def in_thread(coro):
208 def in_thread(coro):
208 """Call a coroutine on the asyncio thread"""
209 """Call a coroutine on the asyncio thread"""
209 return asyncio.run_coroutine_threadsafe(coro, event_loop).result()
210 return asyncio.run_coroutine_threadsafe(coro, event_loop).result()
210
211
212 async def _readchunk(stream):
213 try:
214 return await stream.readuntil(b'\n')
215 except asyncio.exceptions.IncompleteReadError as e:
216 return e.partial
217 except asyncio.exceptions.LimitOverrunError as e:
218 return await stream.read(e.consumed)
219
211 async def _handle_stream(stream, stream_arg, file_object):
220 async def _handle_stream(stream, stream_arg, file_object):
212 while True:
221 while True:
213 line = (await stream.readline()).decode("utf8", errors="replace")
222 chunk = (await _readchunk(stream)).decode("utf8", errors="replace")
214 if not line:
223 if not chunk:
215 break
224 break
216 if stream_arg:
225 if stream_arg:
217 self.shell.user_ns[stream_arg] = line
226 self.shell.user_ns[stream_arg] = chunk
218 else:
227 else:
219 file_object.write(line)
228 file_object.write(chunk)
220 file_object.flush()
229 file_object.flush()
221
230
222 async def _stream_communicate(process, cell):
231 async def _stream_communicate(process, cell):
223 process.stdin.write(cell)
232 process.stdin.write(cell)
224 process.stdin.close()
233 process.stdin.close()
225 stdout_task = asyncio.create_task(
234 stdout_task = asyncio.create_task(
226 _handle_stream(process.stdout, args.out, sys.stdout)
235 _handle_stream(process.stdout, args.out, sys.stdout)
227 )
236 )
228 stderr_task = asyncio.create_task(
237 stderr_task = asyncio.create_task(
229 _handle_stream(process.stderr, args.err, sys.stderr)
238 _handle_stream(process.stderr, args.err, sys.stderr)
230 )
239 )
231 await asyncio.wait([stdout_task, stderr_task])
240 await asyncio.wait([stdout_task, stderr_task])
232 await process.wait()
241 await process.wait()
233
242
234 argv = arg_split(line, posix=not sys.platform.startswith("win"))
243 argv = arg_split(line, posix=not sys.platform.startswith("win"))
235 args, cmd = self.shebang.parser.parse_known_args(argv)
244 args, cmd = self.shebang.parser.parse_known_args(argv)
236
245
237 try:
246 try:
238 p = in_thread(
247 p = in_thread(
239 asyncio.create_subprocess_exec(
248 asyncio.create_subprocess_exec(
240 *cmd,
249 *cmd,
241 stdout=asyncio.subprocess.PIPE,
250 stdout=asyncio.subprocess.PIPE,
242 stderr=asyncio.subprocess.PIPE,
251 stderr=asyncio.subprocess.PIPE,
243 stdin=asyncio.subprocess.PIPE,
252 stdin=asyncio.subprocess.PIPE,
244 )
253 )
245 )
254 )
246 except OSError as e:
255 except OSError as e:
247 if e.errno == errno.ENOENT:
256 if e.errno == errno.ENOENT:
248 print("Couldn't find program: %r" % cmd[0])
257 print("Couldn't find program: %r" % cmd[0])
249 return
258 return
250 else:
259 else:
251 raise
260 raise
252
261
253 if not cell.endswith('\n'):
262 if not cell.endswith('\n'):
254 cell += '\n'
263 cell += '\n'
255 cell = cell.encode('utf8', 'replace')
264 cell = cell.encode('utf8', 'replace')
256 if args.bg:
265 if args.bg:
257 self.bg_processes.append(p)
266 self.bg_processes.append(p)
258 self._gc_bg_processes()
267 self._gc_bg_processes()
259 to_close = []
268 to_close = []
260 if args.out:
269 if args.out:
261 self.shell.user_ns[args.out] = _AsyncIOProxy(p.stdout, event_loop)
270 self.shell.user_ns[args.out] = _AsyncIOProxy(p.stdout, event_loop)
262 else:
271 else:
263 to_close.append(p.stdout)
272 to_close.append(p.stdout)
264 if args.err:
273 if args.err:
265 self.shell.user_ns[args.err] = _AsyncIOProxy(p.stderr, event_loop)
274 self.shell.user_ns[args.err] = _AsyncIOProxy(p.stderr, event_loop)
266 else:
275 else:
267 to_close.append(p.stderr)
276 to_close.append(p.stderr)
268 event_loop.call_soon_threadsafe(
277 event_loop.call_soon_threadsafe(
269 lambda: asyncio.Task(self._run_script(p, cell, to_close))
278 lambda: asyncio.Task(self._run_script(p, cell, to_close))
270 )
279 )
271 if args.proc:
280 if args.proc:
272 proc_proxy = _AsyncIOProxy(p, event_loop)
281 proc_proxy = _AsyncIOProxy(p, event_loop)
273 proc_proxy.stdout = _AsyncIOProxy(p.stdout, event_loop)
282 proc_proxy.stdout = _AsyncIOProxy(p.stdout, event_loop)
274 proc_proxy.stderr = _AsyncIOProxy(p.stderr, event_loop)
283 proc_proxy.stderr = _AsyncIOProxy(p.stderr, event_loop)
275 self.shell.user_ns[args.proc] = proc_proxy
284 self.shell.user_ns[args.proc] = proc_proxy
276 return
285 return
277
286
278 try:
287 try:
279 in_thread(_stream_communicate(p, cell))
288 in_thread(_stream_communicate(p, cell))
280 except KeyboardInterrupt:
289 except KeyboardInterrupt:
281 try:
290 try:
282 p.send_signal(signal.SIGINT)
291 p.send_signal(signal.SIGINT)
283 in_thread(asyncio.wait_for(p.wait(), timeout=0.1))
292 in_thread(asyncio.wait_for(p.wait(), timeout=0.1))
284 if p.returncode is not None:
293 if p.returncode is not None:
285 print("Process is interrupted.")
294 print("Process is interrupted.")
286 return
295 return
287 p.terminate()
296 p.terminate()
288 in_thread(asyncio.wait_for(p.wait(), timeout=0.1))
297 in_thread(asyncio.wait_for(p.wait(), timeout=0.1))
289 if p.returncode is not None:
298 if p.returncode is not None:
290 print("Process is terminated.")
299 print("Process is terminated.")
291 return
300 return
292 p.kill()
301 p.kill()
293 print("Process is killed.")
302 print("Process is killed.")
294 except OSError:
303 except OSError:
295 pass
304 pass
296 except Exception as e:
305 except Exception as e:
297 print("Error while terminating subprocess (pid=%i): %s" % (p.pid, e))
306 print("Error while terminating subprocess (pid=%i): %s" % (p.pid, e))
298 return
307 return
299
308
300 if args.raise_error and p.returncode != 0:
309 if args.raise_error and p.returncode != 0:
301 # If we get here and p.returncode is still None, we must have
310 # If we get here and p.returncode is still None, we must have
302 # killed it but not yet seen its return code. We don't wait for it,
311 # killed it but not yet seen its return code. We don't wait for it,
303 # in case it's stuck in uninterruptible sleep. -9 = SIGKILL
312 # in case it's stuck in uninterruptible sleep. -9 = SIGKILL
304 rc = p.returncode or -9
313 rc = p.returncode or -9
305 raise CalledProcessError(rc, cell)
314 raise CalledProcessError(rc, cell)
306
315
307 shebang.__skip_doctest__ = os.name != "posix"
316 shebang.__skip_doctest__ = os.name != "posix"
308
317
309 async def _run_script(self, p, cell, to_close):
318 async def _run_script(self, p, cell, to_close):
310 """callback for running the script in the background"""
319 """callback for running the script in the background"""
311
320
312 p.stdin.write(cell)
321 p.stdin.write(cell)
313 await p.stdin.drain()
322 await p.stdin.drain()
314 p.stdin.close()
323 p.stdin.close()
315 await p.stdin.wait_closed()
324 await p.stdin.wait_closed()
316 await p.wait()
325 await p.wait()
317 # asyncio read pipes have no close
326 # asyncio read pipes have no close
318 # but we should drain the data anyway
327 # but we should drain the data anyway
319 for s in to_close:
328 for s in to_close:
320 await s.read()
329 await s.read()
321 self._gc_bg_processes()
330 self._gc_bg_processes()
322
331
323 @line_magic("killbgscripts")
332 @line_magic("killbgscripts")
324 def killbgscripts(self, _nouse_=''):
333 def killbgscripts(self, _nouse_=''):
325 """Kill all BG processes started by %%script and its family."""
334 """Kill all BG processes started by %%script and its family."""
326 self.kill_bg_processes()
335 self.kill_bg_processes()
327 print("All background processes were killed.")
336 print("All background processes were killed.")
328
337
329 def kill_bg_processes(self):
338 def kill_bg_processes(self):
330 """Kill all BG processes which are still running."""
339 """Kill all BG processes which are still running."""
331 if not self.bg_processes:
340 if not self.bg_processes:
332 return
341 return
333 for p in self.bg_processes:
342 for p in self.bg_processes:
334 if p.returncode is None:
343 if p.returncode is None:
335 try:
344 try:
336 p.send_signal(signal.SIGINT)
345 p.send_signal(signal.SIGINT)
337 except:
346 except:
338 pass
347 pass
339 time.sleep(0.1)
348 time.sleep(0.1)
340 self._gc_bg_processes()
349 self._gc_bg_processes()
341 if not self.bg_processes:
350 if not self.bg_processes:
342 return
351 return
343 for p in self.bg_processes:
352 for p in self.bg_processes:
344 if p.returncode is None:
353 if p.returncode is None:
345 try:
354 try:
346 p.terminate()
355 p.terminate()
347 except:
356 except:
348 pass
357 pass
349 time.sleep(0.1)
358 time.sleep(0.1)
350 self._gc_bg_processes()
359 self._gc_bg_processes()
351 if not self.bg_processes:
360 if not self.bg_processes:
352 return
361 return
353 for p in self.bg_processes:
362 for p in self.bg_processes:
354 if p.returncode is None:
363 if p.returncode is None:
355 try:
364 try:
356 p.kill()
365 p.kill()
357 except:
366 except:
358 pass
367 pass
359 self._gc_bg_processes()
368 self._gc_bg_processes()
360
369
361 def _gc_bg_processes(self):
370 def _gc_bg_processes(self):
362 self.bg_processes = [p for p in self.bg_processes if p.returncode is None]
371 self.bg_processes = [p for p in self.bg_processes if p.returncode is None]
General Comments 0
You need to be logged in to leave comments. Login now