##// END OF EJS Templates
Start threads for %%script as daemon thread
Takafumi Arakaki -
Show More
@@ -1,276 +1,276
1 """Magic functions for running cells in various scripts."""
1 """Magic functions for running cells in various scripts."""
2 #-----------------------------------------------------------------------------
2 #-----------------------------------------------------------------------------
3 # Copyright (c) 2012 The IPython Development Team.
3 # Copyright (c) 2012 The IPython Development Team.
4 #
4 #
5 # Distributed under the terms of the Modified BSD License.
5 # Distributed under the terms of the Modified BSD License.
6 #
6 #
7 # The full license is in the file COPYING.txt, distributed with this software.
7 # The full license is in the file COPYING.txt, distributed with this software.
8 #-----------------------------------------------------------------------------
8 #-----------------------------------------------------------------------------
9
9
10 #-----------------------------------------------------------------------------
10 #-----------------------------------------------------------------------------
11 # Imports
11 # Imports
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 # Stdlib
14 # Stdlib
15 import os
15 import os
16 import re
16 import re
17 import sys
17 import sys
18 import signal
18 import signal
19 import time
19 import time
20 from subprocess import Popen, PIPE
20 from subprocess import Popen, PIPE
21
21
22 # Our own packages
22 # Our own packages
23 from IPython.config.configurable import Configurable
23 from IPython.config.configurable import Configurable
24 from IPython.core import magic_arguments
24 from IPython.core import magic_arguments
25 from IPython.core.error import UsageError
25 from IPython.core.error import UsageError
26 from IPython.core.magic import (
26 from IPython.core.magic import (
27 Magics, magics_class, line_magic, cell_magic
27 Magics, magics_class, line_magic, cell_magic
28 )
28 )
29 from IPython.lib.backgroundjobs import BackgroundJobManager
29 from IPython.lib.backgroundjobs import BackgroundJobManager
30 from IPython.testing.skipdoctest import skip_doctest
30 from IPython.testing.skipdoctest import skip_doctest
31 from IPython.utils import py3compat
31 from IPython.utils import py3compat
32 from IPython.utils.process import find_cmd, FindCmdError, arg_split
32 from IPython.utils.process import find_cmd, FindCmdError, arg_split
33 from IPython.utils.traitlets import List, Dict
33 from IPython.utils.traitlets import List, Dict
34
34
35 #-----------------------------------------------------------------------------
35 #-----------------------------------------------------------------------------
36 # Magic implementation classes
36 # Magic implementation classes
37 #-----------------------------------------------------------------------------
37 #-----------------------------------------------------------------------------
38
38
39 def script_args(f):
39 def script_args(f):
40 """single decorator for adding script args"""
40 """single decorator for adding script args"""
41 args = [
41 args = [
42 magic_arguments.argument(
42 magic_arguments.argument(
43 '--out', type=str,
43 '--out', type=str,
44 help="""The variable in which to store stdout from the script.
44 help="""The variable in which to store stdout from the script.
45 If the script is backgrounded, this will be the stdout *pipe*,
45 If the script is backgrounded, this will be the stdout *pipe*,
46 instead of the stderr text itself.
46 instead of the stderr text itself.
47 """
47 """
48 ),
48 ),
49 magic_arguments.argument(
49 magic_arguments.argument(
50 '--err', type=str,
50 '--err', type=str,
51 help="""The variable in which to store stderr from the script.
51 help="""The variable in which to store stderr from the script.
52 If the script is backgrounded, this will be the stderr *pipe*,
52 If the script is backgrounded, this will be the stderr *pipe*,
53 instead of the stderr text itself.
53 instead of the stderr text itself.
54 """
54 """
55 ),
55 ),
56 magic_arguments.argument(
56 magic_arguments.argument(
57 '--bg', action="store_true",
57 '--bg', action="store_true",
58 help="""Whether to run the script in the background.
58 help="""Whether to run the script in the background.
59 If given, the only way to see the output of the command is
59 If given, the only way to see the output of the command is
60 with --out/err.
60 with --out/err.
61 """
61 """
62 ),
62 ),
63 magic_arguments.argument(
63 magic_arguments.argument(
64 '--proc', type=str,
64 '--proc', type=str,
65 help="""The variable in which to store Popen instance.
65 help="""The variable in which to store Popen instance.
66 This is used only when --bg option is given.
66 This is used only when --bg option is given.
67 """
67 """
68 ),
68 ),
69 ]
69 ]
70 for arg in args:
70 for arg in args:
71 f = arg(f)
71 f = arg(f)
72 return f
72 return f
73
73
74 @magics_class
74 @magics_class
75 class ScriptMagics(Magics, Configurable):
75 class ScriptMagics(Magics, Configurable):
76 """Magics for talking to scripts
76 """Magics for talking to scripts
77
77
78 This defines a base `%%script` cell magic for running a cell
78 This defines a base `%%script` cell magic for running a cell
79 with a program in a subprocess, and registers a few top-level
79 with a program in a subprocess, and registers a few top-level
80 magics that call %%script with common interpreters.
80 magics that call %%script with common interpreters.
81 """
81 """
82 script_magics = List(config=True,
82 script_magics = List(config=True,
83 help="""Extra script cell magics to define
83 help="""Extra script cell magics to define
84
84
85 This generates simple wrappers of `%%script foo` as `%%foo`.
85 This generates simple wrappers of `%%script foo` as `%%foo`.
86
86
87 If you want to add script magics that aren't on your path,
87 If you want to add script magics that aren't on your path,
88 specify them in script_paths
88 specify them in script_paths
89 """,
89 """,
90 )
90 )
91 def _script_magics_default(self):
91 def _script_magics_default(self):
92 """default to a common list of programs if we find them"""
92 """default to a common list of programs if we find them"""
93
93
94 defaults = []
94 defaults = []
95 to_try = []
95 to_try = []
96 if os.name == 'nt':
96 if os.name == 'nt':
97 defaults.append('cmd')
97 defaults.append('cmd')
98 to_try.append('powershell')
98 to_try.append('powershell')
99 to_try.extend([
99 to_try.extend([
100 'sh',
100 'sh',
101 'bash',
101 'bash',
102 'perl',
102 'perl',
103 'ruby',
103 'ruby',
104 'python3',
104 'python3',
105 'pypy',
105 'pypy',
106 ])
106 ])
107
107
108 for cmd in to_try:
108 for cmd in to_try:
109 if cmd in self.script_paths:
109 if cmd in self.script_paths:
110 defaults.append(cmd)
110 defaults.append(cmd)
111 else:
111 else:
112 try:
112 try:
113 find_cmd(cmd)
113 find_cmd(cmd)
114 except FindCmdError:
114 except FindCmdError:
115 # command not found, ignore it
115 # command not found, ignore it
116 pass
116 pass
117 except ImportError:
117 except ImportError:
118 # Windows without pywin32, find_cmd doesn't work
118 # Windows without pywin32, find_cmd doesn't work
119 pass
119 pass
120 else:
120 else:
121 defaults.append(cmd)
121 defaults.append(cmd)
122 return defaults
122 return defaults
123
123
124 script_paths = Dict(config=True,
124 script_paths = Dict(config=True,
125 help="""Dict mapping short 'ruby' names to full paths, such as '/opt/secret/bin/ruby'
125 help="""Dict mapping short 'ruby' names to full paths, such as '/opt/secret/bin/ruby'
126
126
127 Only necessary for items in script_magics where the default path will not
127 Only necessary for items in script_magics where the default path will not
128 find the right interpreter.
128 find the right interpreter.
129 """
129 """
130 )
130 )
131
131
132 def __init__(self, shell=None):
132 def __init__(self, shell=None):
133 Configurable.__init__(self, config=shell.config)
133 Configurable.__init__(self, config=shell.config)
134 self._generate_script_magics()
134 self._generate_script_magics()
135 Magics.__init__(self, shell=shell)
135 Magics.__init__(self, shell=shell)
136 self.job_manager = BackgroundJobManager()
136 self.job_manager = BackgroundJobManager()
137 self.bg_processes = []
137 self.bg_processes = []
138
138
139 def __del__(self):
139 def __del__(self):
140 self.kill_bg_processes()
140 self.kill_bg_processes()
141
141
142 def _generate_script_magics(self):
142 def _generate_script_magics(self):
143 cell_magics = self.magics['cell']
143 cell_magics = self.magics['cell']
144 for name in self.script_magics:
144 for name in self.script_magics:
145 cell_magics[name] = self._make_script_magic(name)
145 cell_magics[name] = self._make_script_magic(name)
146
146
147 def _make_script_magic(self, name):
147 def _make_script_magic(self, name):
148 """make a named magic, that calls %%script with a particular program"""
148 """make a named magic, that calls %%script with a particular program"""
149 # expand to explicit path if necessary:
149 # expand to explicit path if necessary:
150 script = self.script_paths.get(name, name)
150 script = self.script_paths.get(name, name)
151
151
152 @magic_arguments.magic_arguments()
152 @magic_arguments.magic_arguments()
153 @script_args
153 @script_args
154 def named_script_magic(line, cell):
154 def named_script_magic(line, cell):
155 # if line, add it as cl-flags
155 # if line, add it as cl-flags
156 if line:
156 if line:
157 line = "%s %s" % (script, line)
157 line = "%s %s" % (script, line)
158 else:
158 else:
159 line = script
159 line = script
160 return self.shebang(line, cell)
160 return self.shebang(line, cell)
161
161
162 # write a basic docstring:
162 # write a basic docstring:
163 named_script_magic.__doc__ = \
163 named_script_magic.__doc__ = \
164 """%%{name} script magic
164 """%%{name} script magic
165
165
166 Run cells with {script} in a subprocess.
166 Run cells with {script} in a subprocess.
167
167
168 This is a shortcut for `%%script {script}`
168 This is a shortcut for `%%script {script}`
169 """.format(**locals())
169 """.format(**locals())
170
170
171 return named_script_magic
171 return named_script_magic
172
172
173 @magic_arguments.magic_arguments()
173 @magic_arguments.magic_arguments()
174 @script_args
174 @script_args
175 @cell_magic("script")
175 @cell_magic("script")
176 def shebang(self, line, cell):
176 def shebang(self, line, cell):
177 """Run a cell via a shell command
177 """Run a cell via a shell command
178
178
179 The `%%script` line is like the #! line of script,
179 The `%%script` line is like the #! line of script,
180 specifying a program (bash, perl, ruby, etc.) with which to run.
180 specifying a program (bash, perl, ruby, etc.) with which to run.
181
181
182 The rest of the cell is run by that program.
182 The rest of the cell is run by that program.
183
183
184 Examples
184 Examples
185 --------
185 --------
186 ::
186 ::
187
187
188 In [1]: %%script bash
188 In [1]: %%script bash
189 ...: for i in 1 2 3; do
189 ...: for i in 1 2 3; do
190 ...: echo $i
190 ...: echo $i
191 ...: done
191 ...: done
192 1
192 1
193 2
193 2
194 3
194 3
195 """
195 """
196 argv = arg_split(line, posix = not sys.platform.startswith('win'))
196 argv = arg_split(line, posix = not sys.platform.startswith('win'))
197 args, cmd = self.shebang.parser.parse_known_args(argv)
197 args, cmd = self.shebang.parser.parse_known_args(argv)
198
198
199 p = Popen(cmd, stdout=PIPE, stderr=PIPE, stdin=PIPE)
199 p = Popen(cmd, stdout=PIPE, stderr=PIPE, stdin=PIPE)
200
200
201 cell = cell.encode('utf8', 'replace')
201 cell = cell.encode('utf8', 'replace')
202 if args.bg:
202 if args.bg:
203 self.bg_processes.append(p)
203 self.bg_processes.append(p)
204 if args.out:
204 if args.out:
205 self.shell.user_ns[args.out] = p.stdout
205 self.shell.user_ns[args.out] = p.stdout
206 if args.err:
206 if args.err:
207 self.shell.user_ns[args.err] = p.stderr
207 self.shell.user_ns[args.err] = p.stderr
208 self.job_manager.new(self._run_script, p, cell)
208 self.job_manager.new(self._run_script, p, cell, daemon=True)
209 if args.proc:
209 if args.proc:
210 self.shell.user_ns[args.proc] = p
210 self.shell.user_ns[args.proc] = p
211 return
211 return
212
212
213 try:
213 try:
214 out, err = p.communicate(cell)
214 out, err = p.communicate(cell)
215 except KeyboardInterrupt:
215 except KeyboardInterrupt:
216 try:
216 try:
217 p.send_signal(signal.SIGINT)
217 p.send_signal(signal.SIGINT)
218 time.sleep(0.1)
218 time.sleep(0.1)
219 if p.poll() is not None:
219 if p.poll() is not None:
220 print "Process is interrupted."
220 print "Process is interrupted."
221 return
221 return
222 p.terminate()
222 p.terminate()
223 time.sleep(0.1)
223 time.sleep(0.1)
224 if p.poll() is not None:
224 if p.poll() is not None:
225 print "Process is terminated."
225 print "Process is terminated."
226 return
226 return
227 p.kill()
227 p.kill()
228 print "Process is killed."
228 print "Process is killed."
229 except OSError:
229 except OSError:
230 pass
230 pass
231 except Exception as e:
231 except Exception as e:
232 print "Error while terminating subprocess (pid=%i): %s" \
232 print "Error while terminating subprocess (pid=%i): %s" \
233 % (p.pid, e)
233 % (p.pid, e)
234 return
234 return
235 out = py3compat.bytes_to_str(out)
235 out = py3compat.bytes_to_str(out)
236 err = py3compat.bytes_to_str(err)
236 err = py3compat.bytes_to_str(err)
237 if args.out:
237 if args.out:
238 self.shell.user_ns[args.out] = out
238 self.shell.user_ns[args.out] = out
239 else:
239 else:
240 sys.stdout.write(out)
240 sys.stdout.write(out)
241 sys.stdout.flush()
241 sys.stdout.flush()
242 if args.err:
242 if args.err:
243 self.shell.user_ns[args.err] = err
243 self.shell.user_ns[args.err] = err
244 else:
244 else:
245 sys.stderr.write(err)
245 sys.stderr.write(err)
246 sys.stderr.flush()
246 sys.stderr.flush()
247
247
248 def _run_script(self, p, cell):
248 def _run_script(self, p, cell):
249 """callback for running the script in the background"""
249 """callback for running the script in the background"""
250 p.stdin.write(cell)
250 p.stdin.write(cell)
251 p.stdin.close()
251 p.stdin.close()
252 p.wait()
252 p.wait()
253
253
254 @line_magic("killbgscripts")
254 @line_magic("killbgscripts")
255 def kill_bg_processes(self, dummy=None):
255 def kill_bg_processes(self, dummy=None):
256 """Kill all BG processes which are still running."""
256 """Kill all BG processes which are still running."""
257 for p in self.bg_processes:
257 for p in self.bg_processes:
258 if p.poll() is None:
258 if p.poll() is None:
259 try:
259 try:
260 p.send_signal(signal.SIGINT)
260 p.send_signal(signal.SIGINT)
261 except:
261 except:
262 pass
262 pass
263 time.sleep(0.1)
263 time.sleep(0.1)
264 for p in self.bg_processes:
264 for p in self.bg_processes:
265 if p.poll() is None:
265 if p.poll() is None:
266 try:
266 try:
267 p.terminate()
267 p.terminate()
268 except:
268 except:
269 pass
269 pass
270 time.sleep(0.1)
270 time.sleep(0.1)
271 for p in self.bg_processes:
271 for p in self.bg_processes:
272 if p.poll() is None:
272 if p.poll() is None:
273 try:
273 try:
274 p.kill()
274 p.kill()
275 except:
275 except:
276 pass
276 pass
@@ -1,480 +1,484
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2 """Manage background (threaded) jobs conveniently from an interactive shell.
2 """Manage background (threaded) jobs conveniently from an interactive shell.
3
3
4 This module provides a BackgroundJobManager class. This is the main class
4 This module provides a BackgroundJobManager class. This is the main class
5 meant for public usage, it implements an object which can create and manage
5 meant for public usage, it implements an object which can create and manage
6 new background jobs.
6 new background jobs.
7
7
8 It also provides the actual job classes managed by these BackgroundJobManager
8 It also provides the actual job classes managed by these BackgroundJobManager
9 objects, see their docstrings below.
9 objects, see their docstrings below.
10
10
11
11
12 This system was inspired by discussions with B. Granger and the
12 This system was inspired by discussions with B. Granger and the
13 BackgroundCommand class described in the book Python Scripting for
13 BackgroundCommand class described in the book Python Scripting for
14 Computational Science, by H. P. Langtangen:
14 Computational Science, by H. P. Langtangen:
15
15
16 http://folk.uio.no/hpl/scripting
16 http://folk.uio.no/hpl/scripting
17
17
18 (although ultimately no code from this text was used, as IPython's system is a
18 (although ultimately no code from this text was used, as IPython's system is a
19 separate implementation).
19 separate implementation).
20
20
21 An example notebook is provided in our documentation illustrating interactive
21 An example notebook is provided in our documentation illustrating interactive
22 use of the system.
22 use of the system.
23 """
23 """
24
24
25 #*****************************************************************************
25 #*****************************************************************************
26 # Copyright (C) 2005-2006 Fernando Perez <fperez@colorado.edu>
26 # Copyright (C) 2005-2006 Fernando Perez <fperez@colorado.edu>
27 #
27 #
28 # Distributed under the terms of the BSD License. The full license is in
28 # Distributed under the terms of the BSD License. The full license is in
29 # the file COPYING, distributed as part of this software.
29 # the file COPYING, distributed as part of this software.
30 #*****************************************************************************
30 #*****************************************************************************
31
31
32 # Code begins
32 # Code begins
33 import sys
33 import sys
34 import threading
34 import threading
35
35
36 from IPython.core.ultratb import AutoFormattedTB
36 from IPython.core.ultratb import AutoFormattedTB
37 from IPython.utils.warn import warn, error
37 from IPython.utils.warn import warn, error
38
38
39
39
40 class BackgroundJobManager(object):
40 class BackgroundJobManager(object):
41 """Class to manage a pool of backgrounded threaded jobs.
41 """Class to manage a pool of backgrounded threaded jobs.
42
42
43 Below, we assume that 'jobs' is a BackgroundJobManager instance.
43 Below, we assume that 'jobs' is a BackgroundJobManager instance.
44
44
45 Usage summary (see the method docstrings for details):
45 Usage summary (see the method docstrings for details):
46
46
47 jobs.new(...) -> start a new job
47 jobs.new(...) -> start a new job
48
48
49 jobs() or jobs.status() -> print status summary of all jobs
49 jobs() or jobs.status() -> print status summary of all jobs
50
50
51 jobs[N] -> returns job number N.
51 jobs[N] -> returns job number N.
52
52
53 foo = jobs[N].result -> assign to variable foo the result of job N
53 foo = jobs[N].result -> assign to variable foo the result of job N
54
54
55 jobs[N].traceback() -> print the traceback of dead job N
55 jobs[N].traceback() -> print the traceback of dead job N
56
56
57 jobs.remove(N) -> remove (finished) job N
57 jobs.remove(N) -> remove (finished) job N
58
58
59 jobs.flush() -> remove all finished jobs
59 jobs.flush() -> remove all finished jobs
60
60
61 As a convenience feature, BackgroundJobManager instances provide the
61 As a convenience feature, BackgroundJobManager instances provide the
62 utility result and traceback methods which retrieve the corresponding
62 utility result and traceback methods which retrieve the corresponding
63 information from the jobs list:
63 information from the jobs list:
64
64
65 jobs.result(N) <--> jobs[N].result
65 jobs.result(N) <--> jobs[N].result
66 jobs.traceback(N) <--> jobs[N].traceback()
66 jobs.traceback(N) <--> jobs[N].traceback()
67
67
68 While this appears minor, it allows you to use tab completion
68 While this appears minor, it allows you to use tab completion
69 interactively on the job manager instance.
69 interactively on the job manager instance.
70 """
70 """
71
71
72 def __init__(self):
72 def __init__(self):
73 # Lists for job management, accessed via a property to ensure they're
73 # Lists for job management, accessed via a property to ensure they're
74 # up to date.x
74 # up to date.x
75 self._running = []
75 self._running = []
76 self._completed = []
76 self._completed = []
77 self._dead = []
77 self._dead = []
78 # A dict of all jobs, so users can easily access any of them
78 # A dict of all jobs, so users can easily access any of them
79 self.all = {}
79 self.all = {}
80 # For reporting
80 # For reporting
81 self._comp_report = []
81 self._comp_report = []
82 self._dead_report = []
82 self._dead_report = []
83 # Store status codes locally for fast lookups
83 # Store status codes locally for fast lookups
84 self._s_created = BackgroundJobBase.stat_created_c
84 self._s_created = BackgroundJobBase.stat_created_c
85 self._s_running = BackgroundJobBase.stat_running_c
85 self._s_running = BackgroundJobBase.stat_running_c
86 self._s_completed = BackgroundJobBase.stat_completed_c
86 self._s_completed = BackgroundJobBase.stat_completed_c
87 self._s_dead = BackgroundJobBase.stat_dead_c
87 self._s_dead = BackgroundJobBase.stat_dead_c
88
88
89 @property
89 @property
90 def running(self):
90 def running(self):
91 self._update_status()
91 self._update_status()
92 return self._running
92 return self._running
93
93
94 @property
94 @property
95 def dead(self):
95 def dead(self):
96 self._update_status()
96 self._update_status()
97 return self._dead
97 return self._dead
98
98
99 @property
99 @property
100 def completed(self):
100 def completed(self):
101 self._update_status()
101 self._update_status()
102 return self._completed
102 return self._completed
103
103
104 def new(self, func_or_exp, *args, **kwargs):
104 def new(self, func_or_exp, *args, **kwargs):
105 """Add a new background job and start it in a separate thread.
105 """Add a new background job and start it in a separate thread.
106
106
107 There are two types of jobs which can be created:
107 There are two types of jobs which can be created:
108
108
109 1. Jobs based on expressions which can be passed to an eval() call.
109 1. Jobs based on expressions which can be passed to an eval() call.
110 The expression must be given as a string. For example:
110 The expression must be given as a string. For example:
111
111
112 job_manager.new('myfunc(x,y,z=1)'[,glob[,loc]])
112 job_manager.new('myfunc(x,y,z=1)'[,glob[,loc]])
113
113
114 The given expression is passed to eval(), along with the optional
114 The given expression is passed to eval(), along with the optional
115 global/local dicts provided. If no dicts are given, they are
115 global/local dicts provided. If no dicts are given, they are
116 extracted automatically from the caller's frame.
116 extracted automatically from the caller's frame.
117
117
118 A Python statement is NOT a valid eval() expression. Basically, you
118 A Python statement is NOT a valid eval() expression. Basically, you
119 can only use as an eval() argument something which can go on the right
119 can only use as an eval() argument something which can go on the right
120 of an '=' sign and be assigned to a variable.
120 of an '=' sign and be assigned to a variable.
121
121
122 For example,"print 'hello'" is not valid, but '2+3' is.
122 For example,"print 'hello'" is not valid, but '2+3' is.
123
123
124 2. Jobs given a function object, optionally passing additional
124 2. Jobs given a function object, optionally passing additional
125 positional arguments:
125 positional arguments:
126
126
127 job_manager.new(myfunc, x, y)
127 job_manager.new(myfunc, x, y)
128
128
129 The function is called with the given arguments.
129 The function is called with the given arguments.
130
130
131 If you need to pass keyword arguments to your function, you must
131 If you need to pass keyword arguments to your function, you must
132 supply them as a dict named kw:
132 supply them as a dict named kw:
133
133
134 job_manager.new(myfunc, x, y, kw=dict(z=1))
134 job_manager.new(myfunc, x, y, kw=dict(z=1))
135
135
136 The reason for this assymmetry is that the new() method needs to
136 The reason for this assymmetry is that the new() method needs to
137 maintain access to its own keywords, and this prevents name collisions
137 maintain access to its own keywords, and this prevents name collisions
138 between arguments to new() and arguments to your own functions.
138 between arguments to new() and arguments to your own functions.
139
139
140 In both cases, the result is stored in the job.result field of the
140 In both cases, the result is stored in the job.result field of the
141 background job object.
141 background job object.
142
142
143 You can set `daemon` attribute of the thread by giving the keyword
144 argument `daemon`.
143
145
144 Notes and caveats:
146 Notes and caveats:
145
147
146 1. All threads running share the same standard output. Thus, if your
148 1. All threads running share the same standard output. Thus, if your
147 background jobs generate output, it will come out on top of whatever
149 background jobs generate output, it will come out on top of whatever
148 you are currently writing. For this reason, background jobs are best
150 you are currently writing. For this reason, background jobs are best
149 used with silent functions which simply return their output.
151 used with silent functions which simply return their output.
150
152
151 2. Threads also all work within the same global namespace, and this
153 2. Threads also all work within the same global namespace, and this
152 system does not lock interactive variables. So if you send job to the
154 system does not lock interactive variables. So if you send job to the
153 background which operates on a mutable object for a long time, and
155 background which operates on a mutable object for a long time, and
154 start modifying that same mutable object interactively (or in another
156 start modifying that same mutable object interactively (or in another
155 backgrounded job), all sorts of bizarre behaviour will occur.
157 backgrounded job), all sorts of bizarre behaviour will occur.
156
158
157 3. If a background job is spending a lot of time inside a C extension
159 3. If a background job is spending a lot of time inside a C extension
158 module which does not release the Python Global Interpreter Lock
160 module which does not release the Python Global Interpreter Lock
159 (GIL), this will block the IPython prompt. This is simply because the
161 (GIL), this will block the IPython prompt. This is simply because the
160 Python interpreter can only switch between threads at Python
162 Python interpreter can only switch between threads at Python
161 bytecodes. While the execution is inside C code, the interpreter must
163 bytecodes. While the execution is inside C code, the interpreter must
162 simply wait unless the extension module releases the GIL.
164 simply wait unless the extension module releases the GIL.
163
165
164 4. There is no way, due to limitations in the Python threads library,
166 4. There is no way, due to limitations in the Python threads library,
165 to kill a thread once it has started."""
167 to kill a thread once it has started."""
166
168
167 if callable(func_or_exp):
169 if callable(func_or_exp):
168 kw = kwargs.get('kw',{})
170 kw = kwargs.get('kw',{})
169 job = BackgroundJobFunc(func_or_exp,*args,**kw)
171 job = BackgroundJobFunc(func_or_exp,*args,**kw)
170 elif isinstance(func_or_exp, basestring):
172 elif isinstance(func_or_exp, basestring):
171 if not args:
173 if not args:
172 frame = sys._getframe(1)
174 frame = sys._getframe(1)
173 glob, loc = frame.f_globals, frame.f_locals
175 glob, loc = frame.f_globals, frame.f_locals
174 elif len(args)==1:
176 elif len(args)==1:
175 glob = loc = args[0]
177 glob = loc = args[0]
176 elif len(args)==2:
178 elif len(args)==2:
177 glob,loc = args
179 glob,loc = args
178 else:
180 else:
179 raise ValueError(
181 raise ValueError(
180 'Expression jobs take at most 2 args (globals,locals)')
182 'Expression jobs take at most 2 args (globals,locals)')
181 job = BackgroundJobExpr(func_or_exp, glob, loc)
183 job = BackgroundJobExpr(func_or_exp, glob, loc)
182 else:
184 else:
183 raise TypeError('invalid args for new job')
185 raise TypeError('invalid args for new job')
184
186
187 if kwargs.get('daemon', False):
188 job.daemon = True
185 job.num = len(self.all)+1 if self.all else 0
189 job.num = len(self.all)+1 if self.all else 0
186 self.running.append(job)
190 self.running.append(job)
187 self.all[job.num] = job
191 self.all[job.num] = job
188 print 'Starting job # %s in a separate thread.' % job.num
192 print 'Starting job # %s in a separate thread.' % job.num
189 job.start()
193 job.start()
190 return job
194 return job
191
195
192 def __getitem__(self, job_key):
196 def __getitem__(self, job_key):
193 num = job_key if isinstance(job_key, int) else job_key.num
197 num = job_key if isinstance(job_key, int) else job_key.num
194 return self.all[num]
198 return self.all[num]
195
199
196 def __call__(self):
200 def __call__(self):
197 """An alias to self.status(),
201 """An alias to self.status(),
198
202
199 This allows you to simply call a job manager instance much like the
203 This allows you to simply call a job manager instance much like the
200 Unix `jobs` shell command."""
204 Unix `jobs` shell command."""
201
205
202 return self.status()
206 return self.status()
203
207
204 def _update_status(self):
208 def _update_status(self):
205 """Update the status of the job lists.
209 """Update the status of the job lists.
206
210
207 This method moves finished jobs to one of two lists:
211 This method moves finished jobs to one of two lists:
208 - self.completed: jobs which completed successfully
212 - self.completed: jobs which completed successfully
209 - self.dead: jobs which finished but died.
213 - self.dead: jobs which finished but died.
210
214
211 It also copies those jobs to corresponding _report lists. These lists
215 It also copies those jobs to corresponding _report lists. These lists
212 are used to report jobs completed/dead since the last update, and are
216 are used to report jobs completed/dead since the last update, and are
213 then cleared by the reporting function after each call."""
217 then cleared by the reporting function after each call."""
214
218
215 # Status codes
219 # Status codes
216 srun, scomp, sdead = self._s_running, self._s_completed, self._s_dead
220 srun, scomp, sdead = self._s_running, self._s_completed, self._s_dead
217 # State lists, use the actual lists b/c the public names are properties
221 # State lists, use the actual lists b/c the public names are properties
218 # that call this very function on access
222 # that call this very function on access
219 running, completed, dead = self._running, self._completed, self._dead
223 running, completed, dead = self._running, self._completed, self._dead
220
224
221 # Now, update all state lists
225 # Now, update all state lists
222 for num, job in enumerate(running):
226 for num, job in enumerate(running):
223 stat = job.stat_code
227 stat = job.stat_code
224 if stat == srun:
228 if stat == srun:
225 continue
229 continue
226 elif stat == scomp:
230 elif stat == scomp:
227 completed.append(job)
231 completed.append(job)
228 self._comp_report.append(job)
232 self._comp_report.append(job)
229 running[num] = False
233 running[num] = False
230 elif stat == sdead:
234 elif stat == sdead:
231 dead.append(job)
235 dead.append(job)
232 self._dead_report.append(job)
236 self._dead_report.append(job)
233 running[num] = False
237 running[num] = False
234 # Remove dead/completed jobs from running list
238 # Remove dead/completed jobs from running list
235 running[:] = filter(None, running)
239 running[:] = filter(None, running)
236
240
237 def _group_report(self,group,name):
241 def _group_report(self,group,name):
238 """Report summary for a given job group.
242 """Report summary for a given job group.
239
243
240 Return True if the group had any elements."""
244 Return True if the group had any elements."""
241
245
242 if group:
246 if group:
243 print '%s jobs:' % name
247 print '%s jobs:' % name
244 for job in group:
248 for job in group:
245 print '%s : %s' % (job.num,job)
249 print '%s : %s' % (job.num,job)
246 print
250 print
247 return True
251 return True
248
252
249 def _group_flush(self,group,name):
253 def _group_flush(self,group,name):
250 """Flush a given job group
254 """Flush a given job group
251
255
252 Return True if the group had any elements."""
256 Return True if the group had any elements."""
253
257
254 njobs = len(group)
258 njobs = len(group)
255 if njobs:
259 if njobs:
256 plural = {1:''}.setdefault(njobs,'s')
260 plural = {1:''}.setdefault(njobs,'s')
257 print 'Flushing %s %s job%s.' % (njobs,name,plural)
261 print 'Flushing %s %s job%s.' % (njobs,name,plural)
258 group[:] = []
262 group[:] = []
259 return True
263 return True
260
264
261 def _status_new(self):
265 def _status_new(self):
262 """Print the status of newly finished jobs.
266 """Print the status of newly finished jobs.
263
267
264 Return True if any new jobs are reported.
268 Return True if any new jobs are reported.
265
269
266 This call resets its own state every time, so it only reports jobs
270 This call resets its own state every time, so it only reports jobs
267 which have finished since the last time it was called."""
271 which have finished since the last time it was called."""
268
272
269 self._update_status()
273 self._update_status()
270 new_comp = self._group_report(self._comp_report, 'Completed')
274 new_comp = self._group_report(self._comp_report, 'Completed')
271 new_dead = self._group_report(self._dead_report,
275 new_dead = self._group_report(self._dead_report,
272 'Dead, call jobs.traceback() for details')
276 'Dead, call jobs.traceback() for details')
273 self._comp_report[:] = []
277 self._comp_report[:] = []
274 self._dead_report[:] = []
278 self._dead_report[:] = []
275 return new_comp or new_dead
279 return new_comp or new_dead
276
280
277 def status(self,verbose=0):
281 def status(self,verbose=0):
278 """Print a status of all jobs currently being managed."""
282 """Print a status of all jobs currently being managed."""
279
283
280 self._update_status()
284 self._update_status()
281 self._group_report(self.running,'Running')
285 self._group_report(self.running,'Running')
282 self._group_report(self.completed,'Completed')
286 self._group_report(self.completed,'Completed')
283 self._group_report(self.dead,'Dead')
287 self._group_report(self.dead,'Dead')
284 # Also flush the report queues
288 # Also flush the report queues
285 self._comp_report[:] = []
289 self._comp_report[:] = []
286 self._dead_report[:] = []
290 self._dead_report[:] = []
287
291
288 def remove(self,num):
292 def remove(self,num):
289 """Remove a finished (completed or dead) job."""
293 """Remove a finished (completed or dead) job."""
290
294
291 try:
295 try:
292 job = self.all[num]
296 job = self.all[num]
293 except KeyError:
297 except KeyError:
294 error('Job #%s not found' % num)
298 error('Job #%s not found' % num)
295 else:
299 else:
296 stat_code = job.stat_code
300 stat_code = job.stat_code
297 if stat_code == self._s_running:
301 if stat_code == self._s_running:
298 error('Job #%s is still running, it can not be removed.' % num)
302 error('Job #%s is still running, it can not be removed.' % num)
299 return
303 return
300 elif stat_code == self._s_completed:
304 elif stat_code == self._s_completed:
301 self.completed.remove(job)
305 self.completed.remove(job)
302 elif stat_code == self._s_dead:
306 elif stat_code == self._s_dead:
303 self.dead.remove(job)
307 self.dead.remove(job)
304
308
305 def flush(self):
309 def flush(self):
306 """Flush all finished jobs (completed and dead) from lists.
310 """Flush all finished jobs (completed and dead) from lists.
307
311
308 Running jobs are never flushed.
312 Running jobs are never flushed.
309
313
310 It first calls _status_new(), to update info. If any jobs have
314 It first calls _status_new(), to update info. If any jobs have
311 completed since the last _status_new() call, the flush operation
315 completed since the last _status_new() call, the flush operation
312 aborts."""
316 aborts."""
313
317
314 # Remove the finished jobs from the master dict
318 # Remove the finished jobs from the master dict
315 alljobs = self.all
319 alljobs = self.all
316 for job in self.completed+self.dead:
320 for job in self.completed+self.dead:
317 del(alljobs[job.num])
321 del(alljobs[job.num])
318
322
319 # Now flush these lists completely
323 # Now flush these lists completely
320 fl_comp = self._group_flush(self.completed, 'Completed')
324 fl_comp = self._group_flush(self.completed, 'Completed')
321 fl_dead = self._group_flush(self.dead, 'Dead')
325 fl_dead = self._group_flush(self.dead, 'Dead')
322 if not (fl_comp or fl_dead):
326 if not (fl_comp or fl_dead):
323 print 'No jobs to flush.'
327 print 'No jobs to flush.'
324
328
325 def result(self,num):
329 def result(self,num):
326 """result(N) -> return the result of job N."""
330 """result(N) -> return the result of job N."""
327 try:
331 try:
328 return self.all[num].result
332 return self.all[num].result
329 except KeyError:
333 except KeyError:
330 error('Job #%s not found' % num)
334 error('Job #%s not found' % num)
331
335
332 def _traceback(self, job):
336 def _traceback(self, job):
333 num = job if isinstance(job, int) else job.num
337 num = job if isinstance(job, int) else job.num
334 try:
338 try:
335 self.all[num].traceback()
339 self.all[num].traceback()
336 except KeyError:
340 except KeyError:
337 error('Job #%s not found' % num)
341 error('Job #%s not found' % num)
338
342
339 def traceback(self, job=None):
343 def traceback(self, job=None):
340 if job is None:
344 if job is None:
341 self._update_status()
345 self._update_status()
342 for deadjob in self.dead:
346 for deadjob in self.dead:
343 print "Traceback for: %r" % deadjob
347 print "Traceback for: %r" % deadjob
344 self._traceback(deadjob)
348 self._traceback(deadjob)
345 print
349 print
346 else:
350 else:
347 self._traceback(job)
351 self._traceback(job)
348
352
349
353
350 class BackgroundJobBase(threading.Thread):
354 class BackgroundJobBase(threading.Thread):
351 """Base class to build BackgroundJob classes.
355 """Base class to build BackgroundJob classes.
352
356
353 The derived classes must implement:
357 The derived classes must implement:
354
358
355 - Their own __init__, since the one here raises NotImplementedError. The
359 - Their own __init__, since the one here raises NotImplementedError. The
356 derived constructor must call self._init() at the end, to provide common
360 derived constructor must call self._init() at the end, to provide common
357 initialization.
361 initialization.
358
362
359 - A strform attribute used in calls to __str__.
363 - A strform attribute used in calls to __str__.
360
364
361 - A call() method, which will make the actual execution call and must
365 - A call() method, which will make the actual execution call and must
362 return a value to be held in the 'result' field of the job object."""
366 return a value to be held in the 'result' field of the job object."""
363
367
364 # Class constants for status, in string and as numerical codes (when
368 # Class constants for status, in string and as numerical codes (when
365 # updating jobs lists, we don't want to do string comparisons). This will
369 # updating jobs lists, we don't want to do string comparisons). This will
366 # be done at every user prompt, so it has to be as fast as possible
370 # be done at every user prompt, so it has to be as fast as possible
367 stat_created = 'Created'; stat_created_c = 0
371 stat_created = 'Created'; stat_created_c = 0
368 stat_running = 'Running'; stat_running_c = 1
372 stat_running = 'Running'; stat_running_c = 1
369 stat_completed = 'Completed'; stat_completed_c = 2
373 stat_completed = 'Completed'; stat_completed_c = 2
370 stat_dead = 'Dead (Exception), call jobs.traceback() for details'
374 stat_dead = 'Dead (Exception), call jobs.traceback() for details'
371 stat_dead_c = -1
375 stat_dead_c = -1
372
376
373 def __init__(self):
377 def __init__(self):
374 raise NotImplementedError, \
378 raise NotImplementedError, \
375 "This class can not be instantiated directly."
379 "This class can not be instantiated directly."
376
380
377 def _init(self):
381 def _init(self):
378 """Common initialization for all BackgroundJob objects"""
382 """Common initialization for all BackgroundJob objects"""
379
383
380 for attr in ['call','strform']:
384 for attr in ['call','strform']:
381 assert hasattr(self,attr), "Missing attribute <%s>" % attr
385 assert hasattr(self,attr), "Missing attribute <%s>" % attr
382
386
383 # The num tag can be set by an external job manager
387 # The num tag can be set by an external job manager
384 self.num = None
388 self.num = None
385
389
386 self.status = BackgroundJobBase.stat_created
390 self.status = BackgroundJobBase.stat_created
387 self.stat_code = BackgroundJobBase.stat_created_c
391 self.stat_code = BackgroundJobBase.stat_created_c
388 self.finished = False
392 self.finished = False
389 self.result = '<BackgroundJob has not completed>'
393 self.result = '<BackgroundJob has not completed>'
390
394
391 # reuse the ipython traceback handler if we can get to it, otherwise
395 # reuse the ipython traceback handler if we can get to it, otherwise
392 # make a new one
396 # make a new one
393 try:
397 try:
394 make_tb = get_ipython().InteractiveTB.text
398 make_tb = get_ipython().InteractiveTB.text
395 except:
399 except:
396 make_tb = AutoFormattedTB(mode = 'Context',
400 make_tb = AutoFormattedTB(mode = 'Context',
397 color_scheme='NoColor',
401 color_scheme='NoColor',
398 tb_offset = 1).text
402 tb_offset = 1).text
399 # Note that the actual API for text() requires the three args to be
403 # Note that the actual API for text() requires the three args to be
400 # passed in, so we wrap it in a simple lambda.
404 # passed in, so we wrap it in a simple lambda.
401 self._make_tb = lambda : make_tb(None, None, None)
405 self._make_tb = lambda : make_tb(None, None, None)
402
406
403 # Hold a formatted traceback if one is generated.
407 # Hold a formatted traceback if one is generated.
404 self._tb = None
408 self._tb = None
405
409
406 threading.Thread.__init__(self)
410 threading.Thread.__init__(self)
407
411
408 def __str__(self):
412 def __str__(self):
409 return self.strform
413 return self.strform
410
414
411 def __repr__(self):
415 def __repr__(self):
412 return '<BackgroundJob #%d: %s>' % (self.num, self.strform)
416 return '<BackgroundJob #%d: %s>' % (self.num, self.strform)
413
417
414 def traceback(self):
418 def traceback(self):
415 print self._tb
419 print self._tb
416
420
417 def run(self):
421 def run(self):
418 try:
422 try:
419 self.status = BackgroundJobBase.stat_running
423 self.status = BackgroundJobBase.stat_running
420 self.stat_code = BackgroundJobBase.stat_running_c
424 self.stat_code = BackgroundJobBase.stat_running_c
421 self.result = self.call()
425 self.result = self.call()
422 except:
426 except:
423 self.status = BackgroundJobBase.stat_dead
427 self.status = BackgroundJobBase.stat_dead
424 self.stat_code = BackgroundJobBase.stat_dead_c
428 self.stat_code = BackgroundJobBase.stat_dead_c
425 self.finished = None
429 self.finished = None
426 self.result = ('<BackgroundJob died, call jobs.traceback() for details>')
430 self.result = ('<BackgroundJob died, call jobs.traceback() for details>')
427 self._tb = self._make_tb()
431 self._tb = self._make_tb()
428 else:
432 else:
429 self.status = BackgroundJobBase.stat_completed
433 self.status = BackgroundJobBase.stat_completed
430 self.stat_code = BackgroundJobBase.stat_completed_c
434 self.stat_code = BackgroundJobBase.stat_completed_c
431 self.finished = True
435 self.finished = True
432
436
433
437
434 class BackgroundJobExpr(BackgroundJobBase):
438 class BackgroundJobExpr(BackgroundJobBase):
435 """Evaluate an expression as a background job (uses a separate thread)."""
439 """Evaluate an expression as a background job (uses a separate thread)."""
436
440
437 def __init__(self, expression, glob=None, loc=None):
441 def __init__(self, expression, glob=None, loc=None):
438 """Create a new job from a string which can be fed to eval().
442 """Create a new job from a string which can be fed to eval().
439
443
440 global/locals dicts can be provided, which will be passed to the eval
444 global/locals dicts can be provided, which will be passed to the eval
441 call."""
445 call."""
442
446
443 # fail immediately if the given expression can't be compiled
447 # fail immediately if the given expression can't be compiled
444 self.code = compile(expression,'<BackgroundJob compilation>','eval')
448 self.code = compile(expression,'<BackgroundJob compilation>','eval')
445
449
446 glob = {} if glob is None else glob
450 glob = {} if glob is None else glob
447 loc = {} if loc is None else loc
451 loc = {} if loc is None else loc
448 self.expression = self.strform = expression
452 self.expression = self.strform = expression
449 self.glob = glob
453 self.glob = glob
450 self.loc = loc
454 self.loc = loc
451 self._init()
455 self._init()
452
456
453 def call(self):
457 def call(self):
454 return eval(self.code,self.glob,self.loc)
458 return eval(self.code,self.glob,self.loc)
455
459
456
460
457 class BackgroundJobFunc(BackgroundJobBase):
461 class BackgroundJobFunc(BackgroundJobBase):
458 """Run a function call as a background job (uses a separate thread)."""
462 """Run a function call as a background job (uses a separate thread)."""
459
463
460 def __init__(self, func, *args, **kwargs):
464 def __init__(self, func, *args, **kwargs):
461 """Create a new job from a callable object.
465 """Create a new job from a callable object.
462
466
463 Any positional arguments and keyword args given to this constructor
467 Any positional arguments and keyword args given to this constructor
464 after the initial callable are passed directly to it."""
468 after the initial callable are passed directly to it."""
465
469
466 if not callable(func):
470 if not callable(func):
467 raise TypeError(
471 raise TypeError(
468 'first argument to BackgroundJobFunc must be callable')
472 'first argument to BackgroundJobFunc must be callable')
469
473
470 self.func = func
474 self.func = func
471 self.args = args
475 self.args = args
472 self.kwargs = kwargs
476 self.kwargs = kwargs
473 # The string form will only include the function passed, because
477 # The string form will only include the function passed, because
474 # generating string representations of the arguments is a potentially
478 # generating string representations of the arguments is a potentially
475 # _very_ expensive operation (e.g. with large arrays).
479 # _very_ expensive operation (e.g. with large arrays).
476 self.strform = str(func)
480 self.strform = str(func)
477 self._init()
481 self._init()
478
482
479 def call(self):
483 def call(self):
480 return self.func(*self.args, **self.kwargs)
484 return self.func(*self.args, **self.kwargs)
General Comments 0
You need to be logged in to leave comments. Login now