##// END OF EJS Templates
.
Garland Zhang -
Show More
@@ -1,371 +1,371 b''
1 1 """Magic functions for running cells in various scripts."""
2 2
3 3 # Copyright (c) IPython Development Team.
4 4 # Distributed under the terms of the Modified BSD License.
5 5
6 6 import asyncio
7 7 import asyncio.exceptions
8 8 import atexit
9 9 import errno
10 10 import os
11 11 import signal
12 12 import sys
13 13 import time
14 14 from subprocess import CalledProcessError
15 15 from threading import Thread
16 16
17 17 from traitlets import Any, Dict, List, default
18 18
19 19 from IPython.core import magic_arguments
20 20 from IPython.core.async_helpers import _AsyncIOProxy
21 21 from IPython.core.magic import Magics, cell_magic, line_magic, magics_class
22 22 from IPython.utils.process import arg_split
23 23
24 24 #-----------------------------------------------------------------------------
25 25 # Magic implementation classes
26 26 #-----------------------------------------------------------------------------
27 27
28 28 def script_args(f):
29 29 """single decorator for adding script args"""
30 30 args = [
31 31 magic_arguments.argument(
32 32 '--out', type=str,
33 33 help="""The variable in which to store stdout from the script.
34 34 If the script is backgrounded, this will be the stdout *pipe*,
35 35 instead of the stderr text itself and will not be auto closed.
36 36 """
37 37 ),
38 38 magic_arguments.argument(
39 39 '--err', type=str,
40 40 help="""The variable in which to store stderr from the script.
41 41 If the script is backgrounded, this will be the stderr *pipe*,
42 42 instead of the stderr text itself and will not be autoclosed.
43 43 """
44 44 ),
45 45 magic_arguments.argument(
46 46 '--bg', action="store_true",
47 47 help="""Whether to run the script in the background.
48 48 If given, the only way to see the output of the command is
49 49 with --out/err.
50 50 """
51 51 ),
52 52 magic_arguments.argument(
53 53 '--proc', type=str,
54 54 help="""The variable in which to store Popen instance.
55 55 This is used only when --bg option is given.
56 56 """
57 57 ),
58 58 magic_arguments.argument(
59 59 '--no-raise-error', action="store_false", dest='raise_error',
60 60 help="""Whether you should raise an error message in addition to
61 61 a stream on stderr if you get a nonzero exit code.
62 62 """,
63 63 ),
64 64 ]
65 65 for arg in args:
66 66 f = arg(f)
67 67 return f
68 68
69 69
70 70 @magics_class
71 71 class ScriptMagics(Magics):
72 72 """Magics for talking to scripts
73 73
74 74 This defines a base `%%script` cell magic for running a cell
75 75 with a program in a subprocess, and registers a few top-level
76 76 magics that call %%script with common interpreters.
77 77 """
78 78
79 79 event_loop = Any(
80 80 help="""
81 81 The event loop on which to run subprocesses
82 82
83 83 Not the main event loop,
84 84 because we want to be able to make blocking calls
85 85 and have certain requirements we don't want to impose on the main loop.
86 86 """
87 87 )
88 88
89 89 script_magics = List(
90 90 help="""Extra script cell magics to define
91 91
92 92 This generates simple wrappers of `%%script foo` as `%%foo`.
93 93
94 94 If you want to add script magics that aren't on your path,
95 95 specify them in script_paths
96 96 """,
97 97 ).tag(config=True)
98 98 @default('script_magics')
99 99 def _script_magics_default(self):
100 100 """default to a common list of programs"""
101 101
102 102 defaults = [
103 103 'sh',
104 104 'bash',
105 105 'perl',
106 106 'ruby',
107 107 'python',
108 108 'python2',
109 109 'python3',
110 110 'pypy',
111 111 ]
112 112 if os.name == 'nt':
113 113 defaults.extend([
114 114 'cmd',
115 115 ])
116 116
117 117 return defaults
118 118
119 119 script_paths = Dict(
120 120 help="""Dict mapping short 'ruby' names to full paths, such as '/opt/secret/bin/ruby'
121 121
122 122 Only necessary for items in script_magics where the default path will not
123 123 find the right interpreter.
124 124 """
125 125 ).tag(config=True)
126 126
127 127 def __init__(self, shell=None):
128 128 super(ScriptMagics, self).__init__(shell=shell)
129 129 self._generate_script_magics()
130 130 self.bg_processes = []
131 131 atexit.register(self.kill_bg_processes)
132 132
133 133 def __del__(self):
134 134 self.kill_bg_processes()
135 135
136 136 def _generate_script_magics(self):
137 137 cell_magics = self.magics['cell']
138 138 for name in self.script_magics:
139 139 cell_magics[name] = self._make_script_magic(name)
140 140
141 141 def _make_script_magic(self, name):
142 142 """make a named magic, that calls %%script with a particular program"""
143 143 # expand to explicit path if necessary:
144 144 script = self.script_paths.get(name, name)
145 145
146 146 @magic_arguments.magic_arguments()
147 147 @script_args
148 148 def named_script_magic(line, cell):
149 149 # if line, add it as cl-flags
150 150 if line:
151 151 line = "%s %s" % (script, line)
152 152 else:
153 153 line = script
154 154 return self.shebang(line, cell)
155 155
156 156 # write a basic docstring:
157 157 named_script_magic.__doc__ = \
158 158 """%%{name} script magic
159 159
160 160 Run cells with {script} in a subprocess.
161 161
162 162 This is a shortcut for `%%script {script}`
163 163 """.format(**locals())
164 164
165 165 return named_script_magic
166 166
167 167 @magic_arguments.magic_arguments()
168 168 @script_args
169 169 @cell_magic("script")
170 170 def shebang(self, line, cell):
171 171 """Run a cell via a shell command
172 172
173 173 The `%%script` line is like the #! line of script,
174 174 specifying a program (bash, perl, ruby, etc.) with which to run.
175 175
176 176 The rest of the cell is run by that program.
177 177
178 178 Examples
179 179 --------
180 180 ::
181 181
182 182 In [1]: %%script bash
183 183 ...: for i in 1 2 3; do
184 184 ...: echo $i
185 185 ...: done
186 186 1
187 187 2
188 188 3
189 189 """
190 190
191 191 # Create the event loop in which to run script magics
192 192 # this operates on a background thread
193 193 if self.event_loop is None:
194 194 if sys.platform == "win32":
195 195 # don't override the current policy,
196 196 # just create an event loop
197 197 event_loop = asyncio.WindowsProactorEventLoopPolicy().new_event_loop()
198 198 else:
199 199 event_loop = asyncio.new_event_loop()
200 200 self.event_loop = event_loop
201 201
202 202 # start the loop in a background thread
203 203 asyncio_thread = Thread(target=event_loop.run_forever, daemon=True)
204 204 asyncio_thread.start()
205 205 else:
206 206 event_loop = self.event_loop
207 207
208 208 def in_thread(coro):
209 209 """Call a coroutine on the asyncio thread"""
210 210 return asyncio.run_coroutine_threadsafe(coro, event_loop).result()
211 211
212 212 async def _readchunk(stream):
213 213 try:
214 return await stream.readuntil(b'\n')
214 return await stream.readuntil(b"\n")
215 215 except asyncio.exceptions.IncompleteReadError as e:
216 216 return e.partial
217 217 except asyncio.exceptions.LimitOverrunError as e:
218 218 return await stream.read(e.consumed)
219 219
220 220 async def _handle_stream(stream, stream_arg, file_object):
221 221 while True:
222 222 chunk = (await _readchunk(stream)).decode("utf8", errors="replace")
223 223 if not chunk:
224 224 break
225 225 if stream_arg:
226 226 self.shell.user_ns[stream_arg] = chunk
227 227 else:
228 228 file_object.write(chunk)
229 229 file_object.flush()
230 230
231 231 async def _stream_communicate(process, cell):
232 232 process.stdin.write(cell)
233 233 process.stdin.close()
234 234 stdout_task = asyncio.create_task(
235 235 _handle_stream(process.stdout, args.out, sys.stdout)
236 236 )
237 237 stderr_task = asyncio.create_task(
238 238 _handle_stream(process.stderr, args.err, sys.stderr)
239 239 )
240 240 await asyncio.wait([stdout_task, stderr_task])
241 241 await process.wait()
242 242
243 243 argv = arg_split(line, posix=not sys.platform.startswith("win"))
244 244 args, cmd = self.shebang.parser.parse_known_args(argv)
245 245
246 246 try:
247 247 p = in_thread(
248 248 asyncio.create_subprocess_exec(
249 249 *cmd,
250 250 stdout=asyncio.subprocess.PIPE,
251 251 stderr=asyncio.subprocess.PIPE,
252 252 stdin=asyncio.subprocess.PIPE,
253 253 )
254 254 )
255 255 except OSError as e:
256 256 if e.errno == errno.ENOENT:
257 257 print("Couldn't find program: %r" % cmd[0])
258 258 return
259 259 else:
260 260 raise
261 261
262 262 if not cell.endswith('\n'):
263 263 cell += '\n'
264 264 cell = cell.encode('utf8', 'replace')
265 265 if args.bg:
266 266 self.bg_processes.append(p)
267 267 self._gc_bg_processes()
268 268 to_close = []
269 269 if args.out:
270 270 self.shell.user_ns[args.out] = _AsyncIOProxy(p.stdout, event_loop)
271 271 else:
272 272 to_close.append(p.stdout)
273 273 if args.err:
274 274 self.shell.user_ns[args.err] = _AsyncIOProxy(p.stderr, event_loop)
275 275 else:
276 276 to_close.append(p.stderr)
277 277 event_loop.call_soon_threadsafe(
278 278 lambda: asyncio.Task(self._run_script(p, cell, to_close))
279 279 )
280 280 if args.proc:
281 281 proc_proxy = _AsyncIOProxy(p, event_loop)
282 282 proc_proxy.stdout = _AsyncIOProxy(p.stdout, event_loop)
283 283 proc_proxy.stderr = _AsyncIOProxy(p.stderr, event_loop)
284 284 self.shell.user_ns[args.proc] = proc_proxy
285 285 return
286 286
287 287 try:
288 288 in_thread(_stream_communicate(p, cell))
289 289 except KeyboardInterrupt:
290 290 try:
291 291 p.send_signal(signal.SIGINT)
292 292 in_thread(asyncio.wait_for(p.wait(), timeout=0.1))
293 293 if p.returncode is not None:
294 294 print("Process is interrupted.")
295 295 return
296 296 p.terminate()
297 297 in_thread(asyncio.wait_for(p.wait(), timeout=0.1))
298 298 if p.returncode is not None:
299 299 print("Process is terminated.")
300 300 return
301 301 p.kill()
302 302 print("Process is killed.")
303 303 except OSError:
304 304 pass
305 305 except Exception as e:
306 306 print("Error while terminating subprocess (pid=%i): %s" % (p.pid, e))
307 307 return
308 308
309 309 if args.raise_error and p.returncode != 0:
310 310 # If we get here and p.returncode is still None, we must have
311 311 # killed it but not yet seen its return code. We don't wait for it,
312 312 # in case it's stuck in uninterruptible sleep. -9 = SIGKILL
313 313 rc = p.returncode or -9
314 314 raise CalledProcessError(rc, cell)
315 315
316 316 shebang.__skip_doctest__ = os.name != "posix"
317 317
318 318 async def _run_script(self, p, cell, to_close):
319 319 """callback for running the script in the background"""
320 320
321 321 p.stdin.write(cell)
322 322 await p.stdin.drain()
323 323 p.stdin.close()
324 324 await p.stdin.wait_closed()
325 325 await p.wait()
326 326 # asyncio read pipes have no close
327 327 # but we should drain the data anyway
328 328 for s in to_close:
329 329 await s.read()
330 330 self._gc_bg_processes()
331 331
332 332 @line_magic("killbgscripts")
333 333 def killbgscripts(self, _nouse_=''):
334 334 """Kill all BG processes started by %%script and its family."""
335 335 self.kill_bg_processes()
336 336 print("All background processes were killed.")
337 337
338 338 def kill_bg_processes(self):
339 339 """Kill all BG processes which are still running."""
340 340 if not self.bg_processes:
341 341 return
342 342 for p in self.bg_processes:
343 343 if p.returncode is None:
344 344 try:
345 345 p.send_signal(signal.SIGINT)
346 346 except:
347 347 pass
348 348 time.sleep(0.1)
349 349 self._gc_bg_processes()
350 350 if not self.bg_processes:
351 351 return
352 352 for p in self.bg_processes:
353 353 if p.returncode is None:
354 354 try:
355 355 p.terminate()
356 356 except:
357 357 pass
358 358 time.sleep(0.1)
359 359 self._gc_bg_processes()
360 360 if not self.bg_processes:
361 361 return
362 362 for p in self.bg_processes:
363 363 if p.returncode is None:
364 364 try:
365 365 p.kill()
366 366 except:
367 367 pass
368 368 self._gc_bg_processes()
369 369
370 370 def _gc_bg_processes(self):
371 371 self.bg_processes = [p for p in self.bg_processes if p.returncode is None]
General Comments 0
You need to be logged in to leave comments. Login now