##// END OF EJS Templates
Brian Granger -
Show More
@@ -1,490 +1,490 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2 """Manage background (threaded) jobs conveniently from an interactive shell.
2 """Manage background (threaded) jobs conveniently from an interactive shell.
3
3
4 This module provides a BackgroundJobManager class. This is the main class
4 This module provides a BackgroundJobManager class. This is the main class
5 meant for public usage, it implements an object which can create and manage
5 meant for public usage, it implements an object which can create and manage
6 new background jobs.
6 new background jobs.
7
7
8 It also provides the actual job classes managed by these BackgroundJobManager
8 It also provides the actual job classes managed by these BackgroundJobManager
9 objects, see their docstrings below.
9 objects, see their docstrings below.
10
10
11
11
12 This system was inspired by discussions with B. Granger and the
12 This system was inspired by discussions with B. Granger and the
13 BackgroundCommand class described in the book Python Scripting for
13 BackgroundCommand class described in the book Python Scripting for
14 Computational Science, by H. P. Langtangen:
14 Computational Science, by H. P. Langtangen:
15
15
16 http://folk.uio.no/hpl/scripting
16 http://folk.uio.no/hpl/scripting
17
17
18 (although ultimately no code from this text was used, as IPython's system is a
18 (although ultimately no code from this text was used, as IPython's system is a
19 separate implementation).
19 separate implementation).
20 """
20 """
21
21
22 #*****************************************************************************
22 #*****************************************************************************
23 # Copyright (C) 2005-2006 Fernando Perez <fperez@colorado.edu>
23 # Copyright (C) 2005-2006 Fernando Perez <fperez@colorado.edu>
24 #
24 #
25 # Distributed under the terms of the BSD License. The full license is in
25 # Distributed under the terms of the BSD License. The full license is in
26 # the file COPYING, distributed as part of this software.
26 # the file COPYING, distributed as part of this software.
27 #*****************************************************************************
27 #*****************************************************************************
28
28
29 # Code begins
29 # Code begins
30 import sys
30 import sys
31 import threading
31 import threading
32
32
33 from IPython.ultraTB import AutoFormattedTB
33 from IPython.ultraTB import AutoFormattedTB
34 from IPython.genutils import warn,error
34 from IPython.genutils import warn,error
35
35
36 class BackgroundJobManager:
36 class BackgroundJobManager:
37 """Class to manage a pool of backgrounded threaded jobs.
37 """Class to manage a pool of backgrounded threaded jobs.
38
38
39 Below, we assume that 'jobs' is a BackgroundJobManager instance.
39 Below, we assume that 'jobs' is a BackgroundJobManager instance.
40
40
41 Usage summary (see the method docstrings for details):
41 Usage summary (see the method docstrings for details):
42
42
43 jobs.new(...) -> start a new job
43 jobs.new(...) -> start a new job
44
44
45 jobs() or jobs.status() -> print status summary of all jobs
45 jobs() or jobs.status() -> print status summary of all jobs
46
46
47 jobs[N] -> returns job number N.
47 jobs[N] -> returns job number N.
48
48
49 foo = jobs[N].result -> assign to variable foo the result of job N
49 foo = jobs[N].result -> assign to variable foo the result of job N
50
50
51 jobs[N].traceback() -> print the traceback of dead job N
51 jobs[N].traceback() -> print the traceback of dead job N
52
52
53 jobs.remove(N) -> remove (finished) job N
53 jobs.remove(N) -> remove (finished) job N
54
54
55 jobs.flush_finished() -> remove all finished jobs
55 jobs.flush_finished() -> remove all finished jobs
56
56
57 As a convenience feature, BackgroundJobManager instances provide the
57 As a convenience feature, BackgroundJobManager instances provide the
58 utility result and traceback methods which retrieve the corresponding
58 utility result and traceback methods which retrieve the corresponding
59 information from the jobs list:
59 information from the jobs list:
60
60
61 jobs.result(N) <--> jobs[N].result
61 jobs.result(N) <--> jobs[N].result
62 jobs.traceback(N) <--> jobs[N].traceback()
62 jobs.traceback(N) <--> jobs[N].traceback()
63
63
64 While this appears minor, it allows you to use tab completion
64 While this appears minor, it allows you to use tab completion
65 interactively on the job manager instance.
65 interactively on the job manager instance.
66
66
67 In interactive mode, IPython provides the magic fuction %bg for quick
67 In interactive mode, IPython provides the magic fuction %bg for quick
68 creation of backgrounded expression-based jobs. Type bg? for details."""
68 creation of backgrounded expression-based jobs. Type bg? for details."""
69
69
70 def __init__(self):
70 def __init__(self):
71 # Lists for job management
71 # Lists for job management
72 self.jobs_run = []
72 self.jobs_run = []
73 self.jobs_comp = []
73 self.jobs_comp = []
74 self.jobs_dead = []
74 self.jobs_dead = []
75 # A dict of all jobs, so users can easily access any of them
75 # A dict of all jobs, so users can easily access any of them
76 self.jobs_all = {}
76 self.jobs_all = {}
77 # For reporting
77 # For reporting
78 self._comp_report = []
78 self._comp_report = []
79 self._dead_report = []
79 self._dead_report = []
80 # Store status codes locally for fast lookups
80 # Store status codes locally for fast lookups
81 self._s_created = BackgroundJobBase.stat_created_c
81 self._s_created = BackgroundJobBase.stat_created_c
82 self._s_running = BackgroundJobBase.stat_running_c
82 self._s_running = BackgroundJobBase.stat_running_c
83 self._s_completed = BackgroundJobBase.stat_completed_c
83 self._s_completed = BackgroundJobBase.stat_completed_c
84 self._s_dead = BackgroundJobBase.stat_dead_c
84 self._s_dead = BackgroundJobBase.stat_dead_c
85
85
86 def new(self,func_or_exp,*args,**kwargs):
86 def new(self,func_or_exp,*args,**kwargs):
87 """Add a new background job and start it in a separate thread.
87 """Add a new background job and start it in a separate thread.
88
88
89 There are two types of jobs which can be created:
89 There are two types of jobs which can be created:
90
90
91 1. Jobs based on expressions which can be passed to an eval() call.
91 1. Jobs based on expressions which can be passed to an eval() call.
92 The expression must be given as a string. For example:
92 The expression must be given as a string. For example:
93
93
94 job_manager.new('myfunc(x,y,z=1)'[,glob[,loc]])
94 job_manager.new('myfunc(x,y,z=1)'[,glob[,loc]])
95
95
96 The given expression is passed to eval(), along with the optional
96 The given expression is passed to eval(), along with the optional
97 global/local dicts provided. If no dicts are given, they are
97 global/local dicts provided. If no dicts are given, they are
98 extracted automatically from the caller's frame.
98 extracted automatically from the caller's frame.
99
99
100 A Python statement is NOT a valid eval() expression. Basically, you
100 A Python statement is NOT a valid eval() expression. Basically, you
101 can only use as an eval() argument something which can go on the right
101 can only use as an eval() argument something which can go on the right
102 of an '=' sign and be assigned to a variable.
102 of an '=' sign and be assigned to a variable.
103
103
104 For example,"print 'hello'" is not valid, but '2+3' is.
104 For example,"print 'hello'" is not valid, but '2+3' is.
105
105
106 2. Jobs given a function object, optionally passing additional
106 2. Jobs given a function object, optionally passing additional
107 positional arguments:
107 positional arguments:
108
108
109 job_manager.new(myfunc,x,y)
109 job_manager.new(myfunc,x,y)
110
110
111 The function is called with the given arguments.
111 The function is called with the given arguments.
112
112
113 If you need to pass keyword arguments to your function, you must
113 If you need to pass keyword arguments to your function, you must
114 supply them as a dict named kw:
114 supply them as a dict named kw:
115
115
116 job_manager.new(myfunc,x,y,kw=dict(z=1))
116 job_manager.new(myfunc,x,y,kw=dict(z=1))
117
117
118 The reason for this assymmetry is that the new() method needs to
118 The reason for this assymmetry is that the new() method needs to
119 maintain access to its own keywords, and this prevents name collisions
119 maintain access to its own keywords, and this prevents name collisions
120 between arguments to new() and arguments to your own functions.
120 between arguments to new() and arguments to your own functions.
121
121
122 In both cases, the result is stored in the job.result field of the
122 In both cases, the result is stored in the job.result field of the
123 background job object.
123 background job object.
124
124
125
125
126 Notes and caveats:
126 Notes and caveats:
127
127
128 1. All threads running share the same standard output. Thus, if your
128 1. All threads running share the same standard output. Thus, if your
129 background jobs generate output, it will come out on top of whatever
129 background jobs generate output, it will come out on top of whatever
130 you are currently writing. For this reason, background jobs are best
130 you are currently writing. For this reason, background jobs are best
131 used with silent functions which simply return their output.
131 used with silent functions which simply return their output.
132
132
133 2. Threads also all work within the same global namespace, and this
133 2. Threads also all work within the same global namespace, and this
134 system does not lock interactive variables. So if you send job to the
134 system does not lock interactive variables. So if you send job to the
135 background which operates on a mutable object for a long time, and
135 background which operates on a mutable object for a long time, and
136 start modifying that same mutable object interactively (or in another
136 start modifying that same mutable object interactively (or in another
137 backgrounded job), all sorts of bizarre behaviour will occur.
137 backgrounded job), all sorts of bizarre behaviour will occur.
138
138
139 3. If a background job is spending a lot of time inside a C extension
139 3. If a background job is spending a lot of time inside a C extension
140 module which does not release the Python Global Interpreter Lock
140 module which does not release the Python Global Interpreter Lock
141 (GIL), this will block the IPython prompt. This is simply because the
141 (GIL), this will block the IPython prompt. This is simply because the
142 Python interpreter can only switch between threads at Python
142 Python interpreter can only switch between threads at Python
143 bytecodes. While the execution is inside C code, the interpreter must
143 bytecodes. While the execution is inside C code, the interpreter must
144 simply wait unless the extension module releases the GIL.
144 simply wait unless the extension module releases the GIL.
145
145
146 4. There is no way, due to limitations in the Python threads library,
146 4. There is no way, due to limitations in the Python threads library,
147 to kill a thread once it has started."""
147 to kill a thread once it has started."""
148
148
149 if callable(func_or_exp):
149 if callable(func_or_exp):
150 kw = kwargs.get('kw',{})
150 kw = kwargs.get('kw',{})
151 job = BackgroundJobFunc(func_or_exp,*args,**kw)
151 job = BackgroundJobFunc(func_or_exp,*args,**kw)
152 elif isinstance(func_or_exp,basestring):
152 elif isinstance(func_or_exp,basestring):
153 if not args:
153 if not args:
154 frame = sys._getframe(1)
154 frame = sys._getframe(1)
155 glob, loc = frame.f_globals, frame.f_locals
155 glob, loc = frame.f_globals, frame.f_locals
156 elif len(args)==1:
156 elif len(args)==1:
157 glob = loc = args[0]
157 glob = loc = args[0]
158 elif len(args)==2:
158 elif len(args)==2:
159 glob,loc = args
159 glob,loc = args
160 else:
160 else:
161 raise ValueError,\
161 raise ValueError,\
162 'Expression jobs take at most 2 args (globals,locals)'
162 'Expression jobs take at most 2 args (globals,locals)'
163 job = BackgroundJobExpr(func_or_exp,glob,loc)
163 job = BackgroundJobExpr(func_or_exp,glob,loc)
164 else:
164 else:
165 raise
165 raise
166 jkeys = self.jobs_all.keys()
166 jkeys = self.jobs_all.keys()
167 if jkeys:
167 if jkeys:
168 job.num = max(jkeys)+1
168 job.num = max(jkeys)+1
169 else:
169 else:
170 job.num = 0
170 job.num = 0
171 self.jobs_run.append(job)
171 self.jobs_run.append(job)
172 self.jobs_all[job.num] = job
172 self.jobs_all[job.num] = job
173 print 'Starting job # %s in a separate thread.' % job.num
173 print 'Starting job # %s in a separate thread.' % job.num
174 job.start()
174 job.start()
175 return job
175 return job
176
176
177 def __getitem__(self,key):
177 def __getitem__(self,key):
178 return self.jobs_all[key]
178 return self.jobs_all[key]
179
179
180 def __call__(self):
180 def __call__(self):
181 """An alias to self.status(),
181 """An alias to self.status(),
182
182
183 This allows you to simply call a job manager instance much like the
183 This allows you to simply call a job manager instance much like the
184 Unix jobs shell command."""
184 Unix jobs shell command."""
185
185
186 return self.status()
186 return self.status()
187
187
188 def _update_status(self):
188 def _update_status(self):
189 """Update the status of the job lists.
189 """Update the status of the job lists.
190
190
191 This method moves finished jobs to one of two lists:
191 This method moves finished jobs to one of two lists:
192 - self.jobs_comp: jobs which completed successfully
192 - self.jobs_comp: jobs which completed successfully
193 - self.jobs_dead: jobs which finished but died.
193 - self.jobs_dead: jobs which finished but died.
194
194
195 It also copies those jobs to corresponding _report lists. These lists
195 It also copies those jobs to corresponding _report lists. These lists
196 are used to report jobs completed/dead since the last update, and are
196 are used to report jobs completed/dead since the last update, and are
197 then cleared by the reporting function after each call."""
197 then cleared by the reporting function after each call."""
198
198
199 run,comp,dead = self._s_running,self._s_completed,self._s_dead
199 run,comp,dead = self._s_running,self._s_completed,self._s_dead
200 jobs_run = self.jobs_run
200 jobs_run = self.jobs_run
201 for num in range(len(jobs_run)):
201 for num in range(len(jobs_run)):
202 job = jobs_run[num]
202 job = jobs_run[num]
203 stat = job.stat_code
203 stat = job.stat_code
204 if stat == run:
204 if stat == run:
205 continue
205 continue
206 elif stat == comp:
206 elif stat == comp:
207 self.jobs_comp.append(job)
207 self.jobs_comp.append(job)
208 self._comp_report.append(job)
208 self._comp_report.append(job)
209 jobs_run[num] = False
209 jobs_run[num] = False
210 elif stat == dead:
210 elif stat == dead:
211 self.jobs_dead.append(job)
211 self.jobs_dead.append(job)
212 self._dead_report.append(job)
212 self._dead_report.append(job)
213 jobs_run[num] = False
213 jobs_run[num] = False
214 self.jobs_run = filter(None,self.jobs_run)
214 self.jobs_run = filter(None,self.jobs_run)
215
215
216 def _group_report(self,group,name):
216 def _group_report(self,group,name):
217 """Report summary for a given job group.
217 """Report summary for a given job group.
218
218
219 Return True if the group had any elements."""
219 Return True if the group had any elements."""
220
220
221 if group:
221 if group:
222 print '%s jobs:' % name
222 print '%s jobs:' % name
223 for job in group:
223 for job in group:
224 print '%s : %s' % (job.num,job)
224 print '%s : %s' % (job.num,job)
225 print
225 print
226 return True
226 return True
227
227
228 def _group_flush(self,group,name):
228 def _group_flush(self,group,name):
229 """Flush a given job group
229 """Flush a given job group
230
230
231 Return True if the group had any elements."""
231 Return True if the group had any elements."""
232
232
233 njobs = len(group)
233 njobs = len(group)
234 if njobs:
234 if njobs:
235 plural = {1:''}.setdefault(njobs,'s')
235 plural = {1:''}.setdefault(njobs,'s')
236 print 'Flushing %s %s job%s.' % (njobs,name,plural)
236 print 'Flushing %s %s job%s.' % (njobs,name,plural)
237 group[:] = []
237 group[:] = []
238 return True
238 return True
239
239
240 def _status_new(self):
240 def _status_new(self):
241 """Print the status of newly finished jobs.
241 """Print the status of newly finished jobs.
242
242
243 Return True if any new jobs are reported.
243 Return True if any new jobs are reported.
244
244
245 This call resets its own state every time, so it only reports jobs
245 This call resets its own state every time, so it only reports jobs
246 which have finished since the last time it was called."""
246 which have finished since the last time it was called."""
247
247
248 self._update_status()
248 self._update_status()
249 new_comp = self._group_report(self._comp_report,'Completed')
249 new_comp = self._group_report(self._comp_report,'Completed')
250 new_dead = self._group_report(self._dead_report,
250 new_dead = self._group_report(self._dead_report,
251 'Dead, call job.traceback() for details')
251 'Dead, call jobs.traceback() for details')
252 self._comp_report[:] = []
252 self._comp_report[:] = []
253 self._dead_report[:] = []
253 self._dead_report[:] = []
254 return new_comp or new_dead
254 return new_comp or new_dead
255
255
256 def status(self,verbose=0):
256 def status(self,verbose=0):
257 """Print a status of all jobs currently being managed."""
257 """Print a status of all jobs currently being managed."""
258
258
259 self._update_status()
259 self._update_status()
260 self._group_report(self.jobs_run,'Running')
260 self._group_report(self.jobs_run,'Running')
261 self._group_report(self.jobs_comp,'Completed')
261 self._group_report(self.jobs_comp,'Completed')
262 self._group_report(self.jobs_dead,'Dead')
262 self._group_report(self.jobs_dead,'Dead')
263 # Also flush the report queues
263 # Also flush the report queues
264 self._comp_report[:] = []
264 self._comp_report[:] = []
265 self._dead_report[:] = []
265 self._dead_report[:] = []
266
266
267 def remove(self,num):
267 def remove(self,num):
268 """Remove a finished (completed or dead) job."""
268 """Remove a finished (completed or dead) job."""
269
269
270 try:
270 try:
271 job = self.jobs_all[num]
271 job = self.jobs_all[num]
272 except KeyError:
272 except KeyError:
273 error('Job #%s not found' % num)
273 error('Job #%s not found' % num)
274 else:
274 else:
275 stat_code = job.stat_code
275 stat_code = job.stat_code
276 if stat_code == self._s_running:
276 if stat_code == self._s_running:
277 error('Job #%s is still running, it can not be removed.' % num)
277 error('Job #%s is still running, it can not be removed.' % num)
278 return
278 return
279 elif stat_code == self._s_completed:
279 elif stat_code == self._s_completed:
280 self.jobs_comp.remove(job)
280 self.jobs_comp.remove(job)
281 elif stat_code == self._s_dead:
281 elif stat_code == self._s_dead:
282 self.jobs_dead.remove(job)
282 self.jobs_dead.remove(job)
283
283
284 def flush_finished(self):
284 def flush_finished(self):
285 """Flush all jobs finished (completed and dead) from lists.
285 """Flush all jobs finished (completed and dead) from lists.
286
286
287 Running jobs are never flushed.
287 Running jobs are never flushed.
288
288
289 It first calls _status_new(), to update info. If any jobs have
289 It first calls _status_new(), to update info. If any jobs have
290 completed since the last _status_new() call, the flush operation
290 completed since the last _status_new() call, the flush operation
291 aborts."""
291 aborts."""
292
292
293 if self._status_new():
293 if self._status_new():
294 error('New jobs completed since last '\
294 error('New jobs completed since last '\
295 '_status_new(), aborting flush.')
295 '_status_new(), aborting flush.')
296 return
296 return
297
297
298 # Remove the finished jobs from the master dict
298 # Remove the finished jobs from the master dict
299 jobs_all = self.jobs_all
299 jobs_all = self.jobs_all
300 for job in self.jobs_comp+self.jobs_dead:
300 for job in self.jobs_comp+self.jobs_dead:
301 del(jobs_all[job.num])
301 del(jobs_all[job.num])
302
302
303 # Now flush these lists completely
303 # Now flush these lists completely
304 fl_comp = self._group_flush(self.jobs_comp,'Completed')
304 fl_comp = self._group_flush(self.jobs_comp,'Completed')
305 fl_dead = self._group_flush(self.jobs_dead,'Dead')
305 fl_dead = self._group_flush(self.jobs_dead,'Dead')
306 if not (fl_comp or fl_dead):
306 if not (fl_comp or fl_dead):
307 print 'No jobs to flush.'
307 print 'No jobs to flush.'
308
308
309 def result(self,num):
309 def result(self,num):
310 """result(N) -> return the result of job N."""
310 """result(N) -> return the result of job N."""
311 try:
311 try:
312 return self.jobs_all[num].result
312 return self.jobs_all[num].result
313 except KeyError:
313 except KeyError:
314 error('Job #%s not found' % num)
314 error('Job #%s not found' % num)
315
315
316 def traceback(self,num):
316 def traceback(self,num):
317 try:
317 try:
318 self.jobs_all[num].traceback()
318 self.jobs_all[num].traceback()
319 except KeyError:
319 except KeyError:
320 error('Job #%s not found' % num)
320 error('Job #%s not found' % num)
321
321
322
322
323 class BackgroundJobBase(threading.Thread):
323 class BackgroundJobBase(threading.Thread):
324 """Base class to build BackgroundJob classes.
324 """Base class to build BackgroundJob classes.
325
325
326 The derived classes must implement:
326 The derived classes must implement:
327
327
328 - Their own __init__, since the one here raises NotImplementedError. The
328 - Their own __init__, since the one here raises NotImplementedError. The
329 derived constructor must call self._init() at the end, to provide common
329 derived constructor must call self._init() at the end, to provide common
330 initialization.
330 initialization.
331
331
332 - A strform attribute used in calls to __str__.
332 - A strform attribute used in calls to __str__.
333
333
334 - A call() method, which will make the actual execution call and must
334 - A call() method, which will make the actual execution call and must
335 return a value to be held in the 'result' field of the job object."""
335 return a value to be held in the 'result' field of the job object."""
336
336
337 # Class constants for status, in string and as numerical codes (when
337 # Class constants for status, in string and as numerical codes (when
338 # updating jobs lists, we don't want to do string comparisons). This will
338 # updating jobs lists, we don't want to do string comparisons). This will
339 # be done at every user prompt, so it has to be as fast as possible
339 # be done at every user prompt, so it has to be as fast as possible
340 stat_created = 'Created'; stat_created_c = 0
340 stat_created = 'Created'; stat_created_c = 0
341 stat_running = 'Running'; stat_running_c = 1
341 stat_running = 'Running'; stat_running_c = 1
342 stat_completed = 'Completed'; stat_completed_c = 2
342 stat_completed = 'Completed'; stat_completed_c = 2
343 stat_dead = 'Dead (Exception), call job.traceback() for details'
343 stat_dead = 'Dead (Exception), call jobs.traceback() for details'
344 stat_dead_c = -1
344 stat_dead_c = -1
345
345
346 def __init__(self):
346 def __init__(self):
347 raise NotImplementedError, \
347 raise NotImplementedError, \
348 "This class can not be instantiated directly."
348 "This class can not be instantiated directly."
349
349
350 def _init(self):
350 def _init(self):
351 """Common initialization for all BackgroundJob objects"""
351 """Common initialization for all BackgroundJob objects"""
352
352
353 for attr in ['call','strform']:
353 for attr in ['call','strform']:
354 assert hasattr(self,attr), "Missing attribute <%s>" % attr
354 assert hasattr(self,attr), "Missing attribute <%s>" % attr
355
355
356 # The num tag can be set by an external job manager
356 # The num tag can be set by an external job manager
357 self.num = None
357 self.num = None
358
358
359 self.status = BackgroundJobBase.stat_created
359 self.status = BackgroundJobBase.stat_created
360 self.stat_code = BackgroundJobBase.stat_created_c
360 self.stat_code = BackgroundJobBase.stat_created_c
361 self.finished = False
361 self.finished = False
362 self.result = '<BackgroundJob has not completed>'
362 self.result = '<BackgroundJob has not completed>'
363 # reuse the ipython traceback handler if we can get to it, otherwise
363 # reuse the ipython traceback handler if we can get to it, otherwise
364 # make a new one
364 # make a new one
365 try:
365 try:
366 self._make_tb = __IPYTHON__.InteractiveTB.text
366 self._make_tb = __IPYTHON__.InteractiveTB.text
367 except:
367 except:
368 self._make_tb = AutoFormattedTB(mode = 'Context',
368 self._make_tb = AutoFormattedTB(mode = 'Context',
369 color_scheme='NoColor',
369 color_scheme='NoColor',
370 tb_offset = 1).text
370 tb_offset = 1).text
371 # Hold a formatted traceback if one is generated.
371 # Hold a formatted traceback if one is generated.
372 self._tb = None
372 self._tb = None
373
373
374 threading.Thread.__init__(self)
374 threading.Thread.__init__(self)
375
375
376 def __str__(self):
376 def __str__(self):
377 return self.strform
377 return self.strform
378
378
379 def __repr__(self):
379 def __repr__(self):
380 return '<BackgroundJob: %s>' % self.strform
380 return '<BackgroundJob: %s>' % self.strform
381
381
382 def traceback(self):
382 def traceback(self):
383 print self._tb
383 print self._tb
384
384
385 def run(self):
385 def run(self):
386 try:
386 try:
387 self.status = BackgroundJobBase.stat_running
387 self.status = BackgroundJobBase.stat_running
388 self.stat_code = BackgroundJobBase.stat_running_c
388 self.stat_code = BackgroundJobBase.stat_running_c
389 self.result = self.call()
389 self.result = self.call()
390 except:
390 except:
391 self.status = BackgroundJobBase.stat_dead
391 self.status = BackgroundJobBase.stat_dead
392 self.stat_code = BackgroundJobBase.stat_dead_c
392 self.stat_code = BackgroundJobBase.stat_dead_c
393 self.finished = None
393 self.finished = None
394 self.result = ('<BackgroundJob died, call job.traceback() for details>')
394 self.result = ('<BackgroundJob died, call jobs.traceback() for details>')
395 self._tb = self._make_tb()
395 self._tb = self._make_tb()
396 else:
396 else:
397 self.status = BackgroundJobBase.stat_completed
397 self.status = BackgroundJobBase.stat_completed
398 self.stat_code = BackgroundJobBase.stat_completed_c
398 self.stat_code = BackgroundJobBase.stat_completed_c
399 self.finished = True
399 self.finished = True
400
400
401 class BackgroundJobExpr(BackgroundJobBase):
401 class BackgroundJobExpr(BackgroundJobBase):
402 """Evaluate an expression as a background job (uses a separate thread)."""
402 """Evaluate an expression as a background job (uses a separate thread)."""
403
403
404 def __init__(self,expression,glob=None,loc=None):
404 def __init__(self,expression,glob=None,loc=None):
405 """Create a new job from a string which can be fed to eval().
405 """Create a new job from a string which can be fed to eval().
406
406
407 global/locals dicts can be provided, which will be passed to the eval
407 global/locals dicts can be provided, which will be passed to the eval
408 call."""
408 call."""
409
409
410 # fail immediately if the given expression can't be compiled
410 # fail immediately if the given expression can't be compiled
411 self.code = compile(expression,'<BackgroundJob compilation>','eval')
411 self.code = compile(expression,'<BackgroundJob compilation>','eval')
412
412
413 if glob is None:
413 if glob is None:
414 glob = {}
414 glob = {}
415 if loc is None:
415 if loc is None:
416 loc = {}
416 loc = {}
417
417
418 self.expression = self.strform = expression
418 self.expression = self.strform = expression
419 self.glob = glob
419 self.glob = glob
420 self.loc = loc
420 self.loc = loc
421 self._init()
421 self._init()
422
422
423 def call(self):
423 def call(self):
424 return eval(self.code,self.glob,self.loc)
424 return eval(self.code,self.glob,self.loc)
425
425
426 class BackgroundJobFunc(BackgroundJobBase):
426 class BackgroundJobFunc(BackgroundJobBase):
427 """Run a function call as a background job (uses a separate thread)."""
427 """Run a function call as a background job (uses a separate thread)."""
428
428
429 def __init__(self,func,*args,**kwargs):
429 def __init__(self,func,*args,**kwargs):
430 """Create a new job from a callable object.
430 """Create a new job from a callable object.
431
431
432 Any positional arguments and keyword args given to this constructor
432 Any positional arguments and keyword args given to this constructor
433 after the initial callable are passed directly to it."""
433 after the initial callable are passed directly to it."""
434
434
435 assert callable(func),'first argument must be callable'
435 assert callable(func),'first argument must be callable'
436
436
437 if args is None:
437 if args is None:
438 args = []
438 args = []
439 if kwargs is None:
439 if kwargs is None:
440 kwargs = {}
440 kwargs = {}
441
441
442 self.func = func
442 self.func = func
443 self.args = args
443 self.args = args
444 self.kwargs = kwargs
444 self.kwargs = kwargs
445 # The string form will only include the function passed, because
445 # The string form will only include the function passed, because
446 # generating string representations of the arguments is a potentially
446 # generating string representations of the arguments is a potentially
447 # _very_ expensive operation (e.g. with large arrays).
447 # _very_ expensive operation (e.g. with large arrays).
448 self.strform = str(func)
448 self.strform = str(func)
449 self._init()
449 self._init()
450
450
451 def call(self):
451 def call(self):
452 return self.func(*self.args,**self.kwargs)
452 return self.func(*self.args,**self.kwargs)
453
453
454
454
455 if __name__=='__main__':
455 if __name__=='__main__':
456
456
457 import time
457 import time
458
458
459 def sleepfunc(interval=2,*a,**kw):
459 def sleepfunc(interval=2,*a,**kw):
460 args = dict(interval=interval,
460 args = dict(interval=interval,
461 args=a,
461 args=a,
462 kwargs=kw)
462 kwargs=kw)
463 time.sleep(interval)
463 time.sleep(interval)
464 return args
464 return args
465
465
466 def diefunc(interval=2,*a,**kw):
466 def diefunc(interval=2,*a,**kw):
467 time.sleep(interval)
467 time.sleep(interval)
468 die
468 die
469
469
470 def printfunc(interval=1,reps=5):
470 def printfunc(interval=1,reps=5):
471 for n in range(reps):
471 for n in range(reps):
472 time.sleep(interval)
472 time.sleep(interval)
473 print 'In the background...'
473 print 'In the background...'
474
474
475 jobs = BackgroundJobManager()
475 jobs = BackgroundJobManager()
476 # first job will have # 0
476 # first job will have # 0
477 jobs.new(sleepfunc,4)
477 jobs.new(sleepfunc,4)
478 jobs.new(sleepfunc,kw={'reps':2})
478 jobs.new(sleepfunc,kw={'reps':2})
479 # This makes a job which will die
479 # This makes a job which will die
480 jobs.new(diefunc,1)
480 jobs.new(diefunc,1)
481 jobs.new('printfunc(1,3)')
481 jobs.new('printfunc(1,3)')
482
482
483 # after a while, you can get the traceback of a dead job. Run the line
483 # after a while, you can get the traceback of a dead job. Run the line
484 # below again interactively until it prints a traceback (check the status
484 # below again interactively until it prints a traceback (check the status
485 # of the job):
485 # of the job):
486 print jobs[1].status
486 print jobs[1].status
487 jobs[1].traceback()
487 jobs[1].traceback()
488
488
489 # Run this line again until the printed result changes
489 # Run this line again until the printed result changes
490 print "The result of job #0 is:",jobs[0].result
490 print "The result of job #0 is:",jobs[0].result
General Comments 0
You need to be logged in to leave comments. Login now