##// END OF EJS Templates
Read each from stream until separator found or buffer is full
Garland Zhang -
Show More
@@ -1,362 +1,371 b''
1 1 """Magic functions for running cells in various scripts."""
2 2
3 3 # Copyright (c) IPython Development Team.
4 4 # Distributed under the terms of the Modified BSD License.
5 5
6 6 import asyncio
7 import asyncio.exceptions
7 8 import atexit
8 9 import errno
9 10 import os
10 11 import signal
11 12 import sys
12 13 import time
13 14 from subprocess import CalledProcessError
14 15 from threading import Thread
15 16
16 17 from traitlets import Any, Dict, List, default
17 18
18 19 from IPython.core import magic_arguments
19 20 from IPython.core.async_helpers import _AsyncIOProxy
20 21 from IPython.core.magic import Magics, cell_magic, line_magic, magics_class
21 22 from IPython.utils.process import arg_split
22 23
23 24 #-----------------------------------------------------------------------------
24 25 # Magic implementation classes
25 26 #-----------------------------------------------------------------------------
26 27
27 28 def script_args(f):
28 29 """single decorator for adding script args"""
29 30 args = [
30 31 magic_arguments.argument(
31 32 '--out', type=str,
32 33 help="""The variable in which to store stdout from the script.
33 34 If the script is backgrounded, this will be the stdout *pipe*,
34 35 instead of the stderr text itself and will not be auto closed.
35 36 """
36 37 ),
37 38 magic_arguments.argument(
38 39 '--err', type=str,
39 40 help="""The variable in which to store stderr from the script.
40 41 If the script is backgrounded, this will be the stderr *pipe*,
41 42 instead of the stderr text itself and will not be autoclosed.
42 43 """
43 44 ),
44 45 magic_arguments.argument(
45 46 '--bg', action="store_true",
46 47 help="""Whether to run the script in the background.
47 48 If given, the only way to see the output of the command is
48 49 with --out/err.
49 50 """
50 51 ),
51 52 magic_arguments.argument(
52 53 '--proc', type=str,
53 54 help="""The variable in which to store Popen instance.
54 55 This is used only when --bg option is given.
55 56 """
56 57 ),
57 58 magic_arguments.argument(
58 59 '--no-raise-error', action="store_false", dest='raise_error',
59 60 help="""Whether you should raise an error message in addition to
60 61 a stream on stderr if you get a nonzero exit code.
61 62 """,
62 63 ),
63 64 ]
64 65 for arg in args:
65 66 f = arg(f)
66 67 return f
67 68
68 69
69 70 @magics_class
70 71 class ScriptMagics(Magics):
71 72 """Magics for talking to scripts
72 73
73 74 This defines a base `%%script` cell magic for running a cell
74 75 with a program in a subprocess, and registers a few top-level
75 76 magics that call %%script with common interpreters.
76 77 """
77 78
78 79 event_loop = Any(
79 80 help="""
80 81 The event loop on which to run subprocesses
81 82
82 83 Not the main event loop,
83 84 because we want to be able to make blocking calls
84 85 and have certain requirements we don't want to impose on the main loop.
85 86 """
86 87 )
87 88
88 89 script_magics = List(
89 90 help="""Extra script cell magics to define
90 91
91 92 This generates simple wrappers of `%%script foo` as `%%foo`.
92 93
93 94 If you want to add script magics that aren't on your path,
94 95 specify them in script_paths
95 96 """,
96 97 ).tag(config=True)
97 98 @default('script_magics')
98 99 def _script_magics_default(self):
99 100 """default to a common list of programs"""
100 101
101 102 defaults = [
102 103 'sh',
103 104 'bash',
104 105 'perl',
105 106 'ruby',
106 107 'python',
107 108 'python2',
108 109 'python3',
109 110 'pypy',
110 111 ]
111 112 if os.name == 'nt':
112 113 defaults.extend([
113 114 'cmd',
114 115 ])
115 116
116 117 return defaults
117 118
118 119 script_paths = Dict(
119 120 help="""Dict mapping short 'ruby' names to full paths, such as '/opt/secret/bin/ruby'
120 121
121 122 Only necessary for items in script_magics where the default path will not
122 123 find the right interpreter.
123 124 """
124 125 ).tag(config=True)
125 126
126 127 def __init__(self, shell=None):
127 128 super(ScriptMagics, self).__init__(shell=shell)
128 129 self._generate_script_magics()
129 130 self.bg_processes = []
130 131 atexit.register(self.kill_bg_processes)
131 132
132 133 def __del__(self):
133 134 self.kill_bg_processes()
134 135
135 136 def _generate_script_magics(self):
136 137 cell_magics = self.magics['cell']
137 138 for name in self.script_magics:
138 139 cell_magics[name] = self._make_script_magic(name)
139 140
140 141 def _make_script_magic(self, name):
141 142 """make a named magic, that calls %%script with a particular program"""
142 143 # expand to explicit path if necessary:
143 144 script = self.script_paths.get(name, name)
144 145
145 146 @magic_arguments.magic_arguments()
146 147 @script_args
147 148 def named_script_magic(line, cell):
148 149 # if line, add it as cl-flags
149 150 if line:
150 151 line = "%s %s" % (script, line)
151 152 else:
152 153 line = script
153 154 return self.shebang(line, cell)
154 155
155 156 # write a basic docstring:
156 157 named_script_magic.__doc__ = \
157 158 """%%{name} script magic
158 159
159 160 Run cells with {script} in a subprocess.
160 161
161 162 This is a shortcut for `%%script {script}`
162 163 """.format(**locals())
163 164
164 165 return named_script_magic
165 166
166 167 @magic_arguments.magic_arguments()
167 168 @script_args
168 169 @cell_magic("script")
169 170 def shebang(self, line, cell):
170 171 """Run a cell via a shell command
171 172
172 173 The `%%script` line is like the #! line of script,
173 174 specifying a program (bash, perl, ruby, etc.) with which to run.
174 175
175 176 The rest of the cell is run by that program.
176 177
177 178 Examples
178 179 --------
179 180 ::
180 181
181 182 In [1]: %%script bash
182 183 ...: for i in 1 2 3; do
183 184 ...: echo $i
184 185 ...: done
185 186 1
186 187 2
187 188 3
188 189 """
189 190
190 191 # Create the event loop in which to run script magics
191 192 # this operates on a background thread
192 193 if self.event_loop is None:
193 194 if sys.platform == "win32":
194 195 # don't override the current policy,
195 196 # just create an event loop
196 197 event_loop = asyncio.WindowsProactorEventLoopPolicy().new_event_loop()
197 198 else:
198 199 event_loop = asyncio.new_event_loop()
199 200 self.event_loop = event_loop
200 201
201 202 # start the loop in a background thread
202 203 asyncio_thread = Thread(target=event_loop.run_forever, daemon=True)
203 204 asyncio_thread.start()
204 205 else:
205 206 event_loop = self.event_loop
206 207
207 208 def in_thread(coro):
208 209 """Call a coroutine on the asyncio thread"""
209 210 return asyncio.run_coroutine_threadsafe(coro, event_loop).result()
210 211
212 async def _readchunk(stream):
213 try:
214 return await stream.readuntil(b'\n')
215 except asyncio.exceptions.IncompleteReadError as e:
216 return e.partial
217 except asyncio.exceptions.LimitOverrunError as e:
218 return await stream.read(e.consumed)
219
211 220 async def _handle_stream(stream, stream_arg, file_object):
212 221 while True:
213 line = (await stream.readline()).decode("utf8", errors="replace")
214 if not line:
222 chunk = (await _readchunk(stream)).decode("utf8", errors="replace")
223 if not chunk:
215 224 break
216 225 if stream_arg:
217 self.shell.user_ns[stream_arg] = line
226 self.shell.user_ns[stream_arg] = chunk
218 227 else:
219 file_object.write(line)
228 file_object.write(chunk)
220 229 file_object.flush()
221 230
222 231 async def _stream_communicate(process, cell):
223 232 process.stdin.write(cell)
224 233 process.stdin.close()
225 234 stdout_task = asyncio.create_task(
226 235 _handle_stream(process.stdout, args.out, sys.stdout)
227 236 )
228 237 stderr_task = asyncio.create_task(
229 238 _handle_stream(process.stderr, args.err, sys.stderr)
230 239 )
231 240 await asyncio.wait([stdout_task, stderr_task])
232 241 await process.wait()
233 242
234 243 argv = arg_split(line, posix=not sys.platform.startswith("win"))
235 244 args, cmd = self.shebang.parser.parse_known_args(argv)
236 245
237 246 try:
238 247 p = in_thread(
239 248 asyncio.create_subprocess_exec(
240 249 *cmd,
241 250 stdout=asyncio.subprocess.PIPE,
242 251 stderr=asyncio.subprocess.PIPE,
243 252 stdin=asyncio.subprocess.PIPE,
244 253 )
245 254 )
246 255 except OSError as e:
247 256 if e.errno == errno.ENOENT:
248 257 print("Couldn't find program: %r" % cmd[0])
249 258 return
250 259 else:
251 260 raise
252 261
253 262 if not cell.endswith('\n'):
254 263 cell += '\n'
255 264 cell = cell.encode('utf8', 'replace')
256 265 if args.bg:
257 266 self.bg_processes.append(p)
258 267 self._gc_bg_processes()
259 268 to_close = []
260 269 if args.out:
261 270 self.shell.user_ns[args.out] = _AsyncIOProxy(p.stdout, event_loop)
262 271 else:
263 272 to_close.append(p.stdout)
264 273 if args.err:
265 274 self.shell.user_ns[args.err] = _AsyncIOProxy(p.stderr, event_loop)
266 275 else:
267 276 to_close.append(p.stderr)
268 277 event_loop.call_soon_threadsafe(
269 278 lambda: asyncio.Task(self._run_script(p, cell, to_close))
270 279 )
271 280 if args.proc:
272 281 proc_proxy = _AsyncIOProxy(p, event_loop)
273 282 proc_proxy.stdout = _AsyncIOProxy(p.stdout, event_loop)
274 283 proc_proxy.stderr = _AsyncIOProxy(p.stderr, event_loop)
275 284 self.shell.user_ns[args.proc] = proc_proxy
276 285 return
277 286
278 287 try:
279 288 in_thread(_stream_communicate(p, cell))
280 289 except KeyboardInterrupt:
281 290 try:
282 291 p.send_signal(signal.SIGINT)
283 292 in_thread(asyncio.wait_for(p.wait(), timeout=0.1))
284 293 if p.returncode is not None:
285 294 print("Process is interrupted.")
286 295 return
287 296 p.terminate()
288 297 in_thread(asyncio.wait_for(p.wait(), timeout=0.1))
289 298 if p.returncode is not None:
290 299 print("Process is terminated.")
291 300 return
292 301 p.kill()
293 302 print("Process is killed.")
294 303 except OSError:
295 304 pass
296 305 except Exception as e:
297 306 print("Error while terminating subprocess (pid=%i): %s" % (p.pid, e))
298 307 return
299 308
300 309 if args.raise_error and p.returncode != 0:
301 310 # If we get here and p.returncode is still None, we must have
302 311 # killed it but not yet seen its return code. We don't wait for it,
303 312 # in case it's stuck in uninterruptible sleep. -9 = SIGKILL
304 313 rc = p.returncode or -9
305 314 raise CalledProcessError(rc, cell)
306 315
307 316 shebang.__skip_doctest__ = os.name != "posix"
308 317
309 318 async def _run_script(self, p, cell, to_close):
310 319 """callback for running the script in the background"""
311 320
312 321 p.stdin.write(cell)
313 322 await p.stdin.drain()
314 323 p.stdin.close()
315 324 await p.stdin.wait_closed()
316 325 await p.wait()
317 326 # asyncio read pipes have no close
318 327 # but we should drain the data anyway
319 328 for s in to_close:
320 329 await s.read()
321 330 self._gc_bg_processes()
322 331
323 332 @line_magic("killbgscripts")
324 333 def killbgscripts(self, _nouse_=''):
325 334 """Kill all BG processes started by %%script and its family."""
326 335 self.kill_bg_processes()
327 336 print("All background processes were killed.")
328 337
329 338 def kill_bg_processes(self):
330 339 """Kill all BG processes which are still running."""
331 340 if not self.bg_processes:
332 341 return
333 342 for p in self.bg_processes:
334 343 if p.returncode is None:
335 344 try:
336 345 p.send_signal(signal.SIGINT)
337 346 except:
338 347 pass
339 348 time.sleep(0.1)
340 349 self._gc_bg_processes()
341 350 if not self.bg_processes:
342 351 return
343 352 for p in self.bg_processes:
344 353 if p.returncode is None:
345 354 try:
346 355 p.terminate()
347 356 except:
348 357 pass
349 358 time.sleep(0.1)
350 359 self._gc_bg_processes()
351 360 if not self.bg_processes:
352 361 return
353 362 for p in self.bg_processes:
354 363 if p.returncode is None:
355 364 try:
356 365 p.kill()
357 366 except:
358 367 pass
359 368 self._gc_bg_processes()
360 369
361 370 def _gc_bg_processes(self):
362 371 self.bg_processes = [p for p in self.bg_processes if p.returncode is None]
General Comments 0
You need to be logged in to leave comments. Login now