##// END OF EJS Templates
Merge pull request #1946 from tkf/terminal-image-handler...
Bussonnier Matthias -
r8505:ee83d92c merge
parent child Browse files
Show More
@@ -0,0 +1,95
1 #-----------------------------------------------------------------------------
2 # Copyright (C) 2012 The IPython Development Team
3 #
4 # Distributed under the terms of the BSD License. The full license is in
5 # the file COPYING, distributed as part of this software.
6 #-----------------------------------------------------------------------------
7
8 import os
9 import sys
10 import unittest
11 import base64
12
13 from IPython.zmq.kernelmanager import KernelManager
14 from IPython.frontend.terminal.console.interactiveshell \
15 import ZMQTerminalInteractiveShell
16 from IPython.utils.tempdir import TemporaryDirectory
17 from IPython.testing.tools import monkeypatch
18 from IPython.testing.decorators import skip_without
19 from IPython.utils.ipstruct import Struct
20
21
22 SCRIPT_PATH = os.path.join(
23 os.path.abspath(os.path.dirname(__file__)), 'writetofile.py')
24
25
26 class ZMQTerminalInteractiveShellTestCase(unittest.TestCase):
27
28 def setUp(self):
29 km = KernelManager()
30 self.shell = ZMQTerminalInteractiveShell(kernel_manager=km)
31 self.raw = b'dummy data'
32 self.mime = 'image/png'
33 self.data = {self.mime: base64.encodestring(self.raw).decode('ascii')}
34
35 def test_no_call_by_default(self):
36 def raise_if_called(*args, **kwds):
37 assert False
38
39 shell = self.shell
40 shell.handle_image_PIL = raise_if_called
41 shell.handle_image_stream = raise_if_called
42 shell.handle_image_tempfile = raise_if_called
43 shell.handle_image_callable = raise_if_called
44
45 shell.handle_image(None, None) # arguments are dummy
46
47 @skip_without('PIL')
48 def test_handle_image_PIL(self):
49 import PIL.Image
50
51 open_called_with = []
52 show_called_with = []
53
54 def fake_open(arg):
55 open_called_with.append(arg)
56 return Struct(show=lambda: show_called_with.append(None))
57
58 with monkeypatch(PIL.Image, 'open', fake_open):
59 self.shell.handle_image_PIL(self.data, self.mime)
60
61 self.assertEqual(len(open_called_with), 1)
62 self.assertEqual(len(show_called_with), 1)
63 self.assertEqual(open_called_with[0].getvalue(), self.raw)
64
65 def check_handler_with_file(self, inpath, handler):
66 shell = self.shell
67 configname = '{0}_image_handler'.format(handler)
68 funcname = 'handle_image_{0}'.format(handler)
69
70 assert hasattr(shell, configname)
71 assert hasattr(shell, funcname)
72
73 with TemporaryDirectory() as tmpdir:
74 outpath = os.path.join(tmpdir, 'data')
75 cmd = [sys.executable, SCRIPT_PATH, inpath, outpath]
76 setattr(shell, configname, cmd)
77 getattr(shell, funcname)(self.data, self.mime)
78 # cmd is called and file is closed. So it's safe to open now.
79 with open(outpath, 'rb') as file:
80 transferred = file.read()
81
82 self.assertEqual(transferred, self.raw)
83
84 def test_handle_image_stream(self):
85 self.check_handler_with_file('-', 'stream')
86
87 def test_handle_image_tempfile(self):
88 self.check_handler_with_file('{file}', 'tempfile')
89
90 def test_handle_image_callable(self):
91 called_with = []
92 self.shell.callable_image_handler = called_with.append
93 self.shell.handle_image_callable(self.data, self.mime)
94 self.assertEqual(len(called_with), 1)
95 assert called_with[0] is self.data
@@ -0,0 +1,33
1 #-----------------------------------------------------------------------------
2 # Copyright (C) 2012 The IPython Development Team
3 #
4 # Distributed under the terms of the BSD License. The full license is in
5 # the file COPYING, distributed as part of this software.
6 #-----------------------------------------------------------------------------
7
8 """
9 Copy data from input file to output file for testing.
10
11 Command line usage:
12
13 python writetofile.py INPUT OUTPUT
14
15 Binary data from INPUT file is copied to OUTPUT file.
16 If INPUT is '-', stdin is used.
17
18 """
19
20 if __name__ == '__main__':
21 import sys
22 from IPython.utils.py3compat import PY3
23 (inpath, outpath) = sys.argv[1:]
24
25 if inpath == '-':
26 if PY3:
27 infile = sys.stdin.buffer
28 else:
29 infile = sys.stdin
30 else:
31 infile = open(inpath, 'rb')
32
33 open(outpath, 'w+b').write(infile.read())
@@ -0,0 +1,20
1 #-----------------------------------------------------------------------------
2 # Copyright (C) 2012- The IPython Development Team
3 #
4 # Distributed under the terms of the BSD License. The full license is in
5 # the file COPYING, distributed as part of this software.
6 #-----------------------------------------------------------------------------
7
8 import os
9
10 from IPython.utils.tempdir import NamedFileInTemporaryDirectory
11
12
13 def test_named_file_in_temporary_directory():
14 with NamedFileInTemporaryDirectory('filename') as file:
15 name = file.name
16 assert not file.closed
17 assert os.path.exists(name)
18 file.write(b'test')
19 assert file.closed
20 assert not os.path.exists(name)
@@ -1,342 +1,466
1 1 # -*- coding: utf-8 -*-
2 2 """Frontend of ipython working with python-zmq
3 3
4 4 Ipython's frontend, is a ipython interface that send request to kernel and proccess the kernel's outputs.
5 5
6 6 For more details, see the ipython-zmq design
7 7 """
8 8 #-----------------------------------------------------------------------------
9 9 # Copyright (C) 2011 The IPython Development Team
10 10 #
11 11 # Distributed under the terms of the BSD License. The full license is in
12 12 # the file COPYING, distributed as part of this software.
13 13 #-----------------------------------------------------------------------------
14 14
15 15 #-----------------------------------------------------------------------------
16 16 # Imports
17 17 #-----------------------------------------------------------------------------
18 18 from __future__ import print_function
19 19
20 20 import bdb
21 21 import signal
22 import os
22 23 import sys
23 24 import time
25 import subprocess
26 from io import BytesIO
27 import base64
24 28
25 29 from Queue import Empty
26 30
31 try:
32 from contextlib import nested
33 except:
34 from IPython.utils.nested_context import nested
35
27 36 from IPython.core.alias import AliasManager, AliasError
28 37 from IPython.core import page
29 38 from IPython.utils.warn import warn, error, fatal
30 39 from IPython.utils import io
40 from IPython.utils.traitlets import List, Enum, Any
41 from IPython.utils.tempdir import NamedFileInTemporaryDirectory
31 42
32 43 from IPython.frontend.terminal.interactiveshell import TerminalInteractiveShell
33 44 from IPython.frontend.terminal.console.completer import ZMQCompleter
34 45
35 46
36 47 class ZMQTerminalInteractiveShell(TerminalInteractiveShell):
37 48 """A subclass of TerminalInteractiveShell that uses the 0MQ kernel"""
38 49 _executing = False
39
50
51 image_handler = Enum(('PIL', 'stream', 'tempfile', 'callable'),
52 config=True, help=
53 """
54 Handler for image type output. This is useful, for example,
55 when connecting to the kernel in which pylab inline backend is
56 activated. There are four handlers defined. 'PIL': Use
57 Python Imaging Library to popup image; 'stream': Use an
58 external program to show the image. Image will be fed into
59 the STDIN of the program. You will need to configure
60 `stream_image_handler`; 'tempfile': Use an external program to
61 show the image. Image will be saved in a temporally file and
62 the program is called with the temporally file. You will need
63 to configure `tempfile_image_handler`; 'callable': You can set
64 any Python callable which is called with the image data. You
65 will need to configure `callable_image_handler`.
66 """
67 )
68
69 stream_image_handler = List(config=True, help=
70 """
71 Command to invoke an image viewer program when you are using
72 'stream' image handler. This option is a list of string where
73 the first element is the command itself and reminders are the
74 options for the command. Raw image data is given as STDIN to
75 the program.
76 """
77 )
78
79 tempfile_image_handler = List(config=True, help=
80 """
81 Command to invoke an image viewer program when you are using
82 'tempfile' image handler. This option is a list of string
83 where the first element is the command itself and reminders
84 are the options for the command. You can use {file} and
85 {format} in the string to represent the location of the
86 generated image file and image format.
87 """
88 )
89
90 callable_image_handler = Any(config=True, help=
91 """
92 Callable object called via 'callable' image handler with one
93 argument, `data`, which is `msg["content"]["data"]` where
94 `msg` is the message from iopub channel. For exmaple, you can
95 find base64 encoded PNG data as `data['image/png']`.
96 """
97 )
98
99 mime_preference = List(
100 default_value=['image/png', 'image/jpeg', 'image/svg+xml'],
101 config=True, allow_none=False, help=
102 """
103 Preferred object representation MIME type in order. First
104 matched MIME type will be used.
105 """
106 )
107
40 108 def __init__(self, *args, **kwargs):
41 109 self.km = kwargs.pop('kernel_manager')
42 110 self.session_id = self.km.session.session
43 111 super(ZMQTerminalInteractiveShell, self).__init__(*args, **kwargs)
44 112
45 113 def init_completer(self):
46 114 """Initialize the completion machinery.
47 115
48 116 This creates completion machinery that can be used by client code,
49 117 either interactively in-process (typically triggered by the readline
50 118 library), programatically (such as in test suites) or out-of-prcess
51 119 (typically over the network by remote frontends).
52 120 """
53 121 from IPython.core.completerlib import (module_completer,
54 122 magic_run_completer, cd_completer)
55 123
56 124 self.Completer = ZMQCompleter(self, self.km)
57 125
58 126
59 127 self.set_hook('complete_command', module_completer, str_key = 'import')
60 128 self.set_hook('complete_command', module_completer, str_key = 'from')
61 129 self.set_hook('complete_command', magic_run_completer, str_key = '%run')
62 130 self.set_hook('complete_command', cd_completer, str_key = '%cd')
63 131
64 132 # Only configure readline if we truly are using readline. IPython can
65 133 # do tab-completion over the network, in GUIs, etc, where readline
66 134 # itself may be absent
67 135 if self.has_readline:
68 136 self.set_readline_completer()
69 137
70 138 def run_cell(self, cell, store_history=True):
71 139 """Run a complete IPython cell.
72 140
73 141 Parameters
74 142 ----------
75 143 cell : str
76 144 The code (including IPython code such as %magic functions) to run.
77 145 store_history : bool
78 146 If True, the raw and translated cell will be stored in IPython's
79 147 history. For user code calling back into IPython's machinery, this
80 148 should be set to False.
81 149 """
82 150 if (not cell) or cell.isspace():
83 151 return
84 152
85 153 if cell.strip() == 'exit':
86 154 # explicitly handle 'exit' command
87 155 return self.ask_exit()
88 156
89 157 self._executing = True
90 158 # flush stale replies, which could have been ignored, due to missed heartbeats
91 159 while self.km.shell_channel.msg_ready():
92 160 self.km.shell_channel.get_msg()
93 161 # shell_channel.execute takes 'hidden', which is the inverse of store_hist
94 162 msg_id = self.km.shell_channel.execute(cell, not store_history)
95 163 while not self.km.shell_channel.msg_ready() and self.km.is_alive:
96 164 try:
97 165 self.handle_stdin_request(timeout=0.05)
98 166 except Empty:
99 167 # display intermediate print statements, etc.
100 168 self.handle_iopub()
101 169 pass
102 170 if self.km.shell_channel.msg_ready():
103 171 self.handle_execute_reply(msg_id)
104 172 self._executing = False
105 173
106 174 #-----------------
107 175 # message handlers
108 176 #-----------------
109 177
110 178 def handle_execute_reply(self, msg_id):
111 179 msg = self.km.shell_channel.get_msg()
112 180 if msg["parent_header"].get("msg_id", None) == msg_id:
113 181
114 182 self.handle_iopub()
115 183
116 184 content = msg["content"]
117 185 status = content['status']
118 186
119 187 if status == 'aborted':
120 188 self.write('Aborted\n')
121 189 return
122 190 elif status == 'ok':
123 191 # print execution payloads as well:
124 192 for item in content["payload"]:
125 193 text = item.get('text', None)
126 194 if text:
127 195 page.page(text)
128 196
129 197 elif status == 'error':
130 198 for frame in content["traceback"]:
131 199 print(frame, file=io.stderr)
132 200
133 201 self.execution_count = int(content["execution_count"] + 1)
134 202
135 203
136 204 def handle_iopub(self):
137 205 """ Method to procces subscribe channel's messages
138 206
139 207 This method reads a message and processes the content in different
140 208 outputs like stdout, stderr, pyout and status
141 209
142 210 Arguments:
143 211 sub_msg: message receive from kernel in the sub socket channel
144 212 capture by kernel manager.
145 213 """
146 214 while self.km.sub_channel.msg_ready():
147 215 sub_msg = self.km.sub_channel.get_msg()
148 216 msg_type = sub_msg['header']['msg_type']
149 217 parent = sub_msg["parent_header"]
150 218 if (not parent) or self.session_id == parent['session']:
151 219 if msg_type == 'status' :
152 220 if sub_msg["content"]["execution_state"] == "busy" :
153 221 pass
154 222
155 223 elif msg_type == 'stream' :
156 224 if sub_msg["content"]["name"] == "stdout":
157 225 print(sub_msg["content"]["data"], file=io.stdout, end="")
158 226 io.stdout.flush()
159 227 elif sub_msg["content"]["name"] == "stderr" :
160 228 print(sub_msg["content"]["data"], file=io.stderr, end="")
161 229 io.stderr.flush()
162 230
163 231 elif msg_type == 'pyout':
164 232 self.execution_count = int(sub_msg["content"]["execution_count"])
165 233 format_dict = sub_msg["content"]["data"]
234 self.handle_rich_data(format_dict)
166 235 # taken from DisplayHook.__call__:
167 236 hook = self.displayhook
168 237 hook.start_displayhook()
169 238 hook.write_output_prompt()
170 239 hook.write_format_data(format_dict)
171 240 hook.log_output(format_dict)
172 241 hook.finish_displayhook()
173 242
243 elif msg_type == 'display_data':
244 self.handle_rich_data(sub_msg["content"]["data"])
245
246 _imagemime = {
247 'image/png': 'png',
248 'image/jpeg': 'jpeg',
249 'image/svg+xml': 'svg',
250 }
251
252 def handle_rich_data(self, data):
253 for mime in self.mime_preference:
254 if mime in data and mime in self._imagemime:
255 self.handle_image(data, mime)
256 return
257
258 def handle_image(self, data, mime):
259 handler = getattr(
260 self, 'handle_image_{0}'.format(self.image_handler), None)
261 if handler:
262 handler(data, mime)
263
264 def handle_image_PIL(self, data, mime):
265 if mime not in ('image/png', 'image/jpeg'):
266 return
267 import PIL.Image
268 raw = base64.decodestring(data[mime].encode('ascii'))
269 img = PIL.Image.open(BytesIO(raw))
270 img.show()
271
272 def handle_image_stream(self, data, mime):
273 raw = base64.decodestring(data[mime].encode('ascii'))
274 imageformat = self._imagemime[mime]
275 fmt = dict(format=imageformat)
276 args = [s.format(**fmt) for s in self.stream_image_handler]
277 with open(os.devnull, 'w') as devnull:
278 proc = subprocess.Popen(
279 args, stdin=subprocess.PIPE,
280 stdout=devnull, stderr=devnull)
281 proc.communicate(raw)
282
283 def handle_image_tempfile(self, data, mime):
284 raw = base64.decodestring(data[mime].encode('ascii'))
285 imageformat = self._imagemime[mime]
286 filename = 'tmp.{0}'.format(imageformat)
287 with nested(NamedFileInTemporaryDirectory(filename),
288 open(os.devnull, 'w')) as (f, devnull):
289 f.write(raw)
290 f.flush()
291 fmt = dict(file=f.name, format=imageformat)
292 args = [s.format(**fmt) for s in self.tempfile_image_handler]
293 subprocess.call(args, stdout=devnull, stderr=devnull)
294
295 def handle_image_callable(self, data, mime):
296 self.callable_image_handler(data)
297
174 298 def handle_stdin_request(self, timeout=0.1):
175 299 """ Method to capture raw_input
176 300 """
177 301 msg_rep = self.km.stdin_channel.get_msg(timeout=timeout)
178 302 # in case any iopub came while we were waiting:
179 303 self.handle_iopub()
180 304 if self.session_id == msg_rep["parent_header"].get("session"):
181 305 # wrap SIGINT handler
182 306 real_handler = signal.getsignal(signal.SIGINT)
183 307 def double_int(sig,frame):
184 308 # call real handler (forwards sigint to kernel),
185 309 # then raise local interrupt, stopping local raw_input
186 310 real_handler(sig,frame)
187 311 raise KeyboardInterrupt
188 312 signal.signal(signal.SIGINT, double_int)
189 313
190 314 try:
191 315 raw_data = raw_input(msg_rep["content"]["prompt"])
192 316 except EOFError:
193 317 # turn EOFError into EOF character
194 318 raw_data = '\x04'
195 319 except KeyboardInterrupt:
196 320 sys.stdout.write('\n')
197 321 return
198 322 finally:
199 323 # restore SIGINT handler
200 324 signal.signal(signal.SIGINT, real_handler)
201 325
202 326 # only send stdin reply if there *was not* another request
203 327 # or execution finished while we were reading.
204 328 if not (self.km.stdin_channel.msg_ready() or self.km.shell_channel.msg_ready()):
205 329 self.km.stdin_channel.input(raw_data)
206 330
207 331 def mainloop(self, display_banner=False):
208 332 while True:
209 333 try:
210 334 self.interact(display_banner=display_banner)
211 335 #self.interact_with_readline()
212 336 # XXX for testing of a readline-decoupled repl loop, call
213 337 # interact_with_readline above
214 338 break
215 339 except KeyboardInterrupt:
216 340 # this should not be necessary, but KeyboardInterrupt
217 341 # handling seems rather unpredictable...
218 342 self.write("\nKeyboardInterrupt in interact()\n")
219 343
220 344 def wait_for_kernel(self, timeout=None):
221 345 """method to wait for a kernel to be ready"""
222 346 tic = time.time()
223 347 self.km.hb_channel.unpause()
224 348 while True:
225 349 self.run_cell('1', False)
226 350 if self.km.hb_channel.is_beating():
227 351 # heart failure was not the reason this returned
228 352 break
229 353 else:
230 354 # heart failed
231 355 if timeout is not None and (time.time() - tic) > timeout:
232 356 return False
233 357 return True
234 358
235 359 def interact(self, display_banner=None):
236 360 """Closely emulate the interactive Python console."""
237 361
238 362 # batch run -> do not interact
239 363 if self.exit_now:
240 364 return
241 365
242 366 if display_banner is None:
243 367 display_banner = self.display_banner
244 368
245 369 if isinstance(display_banner, basestring):
246 370 self.show_banner(display_banner)
247 371 elif display_banner:
248 372 self.show_banner()
249 373
250 374 more = False
251 375
252 376 # run a non-empty no-op, so that we don't get a prompt until
253 377 # we know the kernel is ready. This keeps the connection
254 378 # message above the first prompt.
255 379 if not self.wait_for_kernel(3):
256 380 error("Kernel did not respond\n")
257 381 return
258 382
259 383 if self.has_readline:
260 384 self.readline_startup_hook(self.pre_readline)
261 385 hlen_b4_cell = self.readline.get_current_history_length()
262 386 else:
263 387 hlen_b4_cell = 0
264 388 # exit_now is set by a call to %Exit or %Quit, through the
265 389 # ask_exit callback.
266 390
267 391 while not self.exit_now:
268 392 if not self.km.is_alive:
269 393 # kernel died, prompt for action or exit
270 394 action = "restart" if self.km.has_kernel else "wait for restart"
271 395 ans = self.ask_yes_no("kernel died, %s ([y]/n)?" % action, default='y')
272 396 if ans:
273 397 if self.km.has_kernel:
274 398 self.km.restart_kernel(True)
275 399 self.wait_for_kernel(3)
276 400 else:
277 401 self.exit_now = True
278 402 continue
279 403 try:
280 404 # protect prompt block from KeyboardInterrupt
281 405 # when sitting on ctrl-C
282 406 self.hooks.pre_prompt_hook()
283 407 if more:
284 408 try:
285 409 prompt = self.prompt_manager.render('in2')
286 410 except Exception:
287 411 self.showtraceback()
288 412 if self.autoindent:
289 413 self.rl_do_indent = True
290 414
291 415 else:
292 416 try:
293 417 prompt = self.separate_in + self.prompt_manager.render('in')
294 418 except Exception:
295 419 self.showtraceback()
296 420
297 421 line = self.raw_input(prompt)
298 422 if self.exit_now:
299 423 # quick exit on sys.std[in|out] close
300 424 break
301 425 if self.autoindent:
302 426 self.rl_do_indent = False
303 427
304 428 except KeyboardInterrupt:
305 429 #double-guard against keyboardinterrupts during kbdint handling
306 430 try:
307 431 self.write('\nKeyboardInterrupt\n')
308 432 source_raw = self.input_splitter.source_raw_reset()[1]
309 433 hlen_b4_cell = self._replace_rlhist_multiline(source_raw, hlen_b4_cell)
310 434 more = False
311 435 except KeyboardInterrupt:
312 436 pass
313 437 except EOFError:
314 438 if self.autoindent:
315 439 self.rl_do_indent = False
316 440 if self.has_readline:
317 441 self.readline_startup_hook(None)
318 442 self.write('\n')
319 443 self.exit()
320 444 except bdb.BdbQuit:
321 445 warn('The Python debugger has exited with a BdbQuit exception.\n'
322 446 'Because of how pdb handles the stack, it is impossible\n'
323 447 'for IPython to properly format this particular exception.\n'
324 448 'IPython will resume normal operation.')
325 449 except:
326 450 # exceptions here are VERY RARE, but they can be triggered
327 451 # asynchronously by signal handlers, for example.
328 452 self.showtraceback()
329 453 else:
330 454 self.input_splitter.push(line)
331 455 more = self.input_splitter.push_accepts_more()
332 456 if (self.SyntaxTB.last_syntax_error and
333 457 self.autoedit_syntax):
334 458 self.edit_syntax_error()
335 459 if not more:
336 460 source_raw = self.input_splitter.source_raw_reset()[1]
337 461 hlen_b4_cell = self._replace_rlhist_multiline(source_raw, hlen_b4_cell)
338 462 self.run_cell(source_raw)
339 463
340 464
341 465 # Turn off the exit flag, so the mainloop can be restarted if desired
342 466 self.exit_now = False
@@ -1,76 +1,106
1 1 """TemporaryDirectory class, copied from Python 3.2.
2 2
3 3 This is copied from the stdlib and will be standard in Python 3.2 and onwards.
4 4 """
5 5
6 import os as _os
7
6 8 # This code should only be used in Python versions < 3.2, since after that we
7 9 # can rely on the stdlib itself.
8 10 try:
9 11 from tempfile import TemporaryDirectory
10 12
11 13 except ImportError:
12
13 import os as _os
14 14 from tempfile import mkdtemp, template
15 15
16 16 class TemporaryDirectory(object):
17 17 """Create and return a temporary directory. This has the same
18 18 behavior as mkdtemp but can be used as a context manager. For
19 19 example:
20 20
21 21 with TemporaryDirectory() as tmpdir:
22 22 ...
23 23
24 24 Upon exiting the context, the directory and everthing contained
25 25 in it are removed.
26 26 """
27 27
28 28 def __init__(self, suffix="", prefix=template, dir=None):
29 29 self.name = mkdtemp(suffix, prefix, dir)
30 30 self._closed = False
31 31
32 32 def __enter__(self):
33 33 return self.name
34 34
35 35 def cleanup(self):
36 36 if not self._closed:
37 37 self._rmtree(self.name)
38 38 self._closed = True
39 39
40 40 def __exit__(self, exc, value, tb):
41 41 self.cleanup()
42 42
43 43 __del__ = cleanup
44 44
45 45
46 46 # XXX (ncoghlan): The following code attempts to make
47 47 # this class tolerant of the module nulling out process
48 48 # that happens during CPython interpreter shutdown
49 49 # Alas, it doesn't actually manage it. See issue #10188
50 50 _listdir = staticmethod(_os.listdir)
51 51 _path_join = staticmethod(_os.path.join)
52 52 _isdir = staticmethod(_os.path.isdir)
53 53 _remove = staticmethod(_os.remove)
54 54 _rmdir = staticmethod(_os.rmdir)
55 55 _os_error = _os.error
56 56
57 57 def _rmtree(self, path):
58 58 # Essentially a stripped down version of shutil.rmtree. We can't
59 59 # use globals because they may be None'ed out at shutdown.
60 60 for name in self._listdir(path):
61 61 fullname = self._path_join(path, name)
62 62 try:
63 63 isdir = self._isdir(fullname)
64 64 except self._os_error:
65 65 isdir = False
66 66 if isdir:
67 67 self._rmtree(fullname)
68 68 else:
69 69 try:
70 70 self._remove(fullname)
71 71 except self._os_error:
72 72 pass
73 73 try:
74 74 self._rmdir(path)
75 75 except self._os_error:
76 76 pass
77
78
79 class NamedFileInTemporaryDirectory(object):
80
81 def __init__(self, filename, mode='w+b', bufsize=-1, **kwds):
82 """
83 Open a file named `filename` in a temporary directory.
84
85 This context manager is preferred over `NamedTemporaryFile` in
86 stdlib `tempfile` when one needs to reopen the file.
87
88 Arguments `mode` and `bufsize` are passed to `open`.
89 Rest of the arguments are passed to `TemporaryDirectory`.
90
91 """
92 self._tmpdir = TemporaryDirectory(**kwds)
93 path = _os.path.join(self._tmpdir.name, filename)
94 self.file = open(path, mode, bufsize)
95
96 def cleanup(self):
97 self.file.close()
98 self._tmpdir.cleanup()
99
100 __del__ = cleanup
101
102 def __enter__(self):
103 return self.file
104
105 def __exit__(self, type, value, traceback):
106 self.cleanup()
General Comments 0
You need to be logged in to leave comments. Login now