Show More
@@ -17,6 +17,9 b' http://folk.uio.no/hpl/scripting' | |||||
17 |
|
17 | |||
18 | (although ultimately no code from this text was used, as IPython's system is a |
|
18 | (although ultimately no code from this text was used, as IPython's system is a | |
19 | separate implementation). |
|
19 | separate implementation). | |
|
20 | ||||
|
21 | An example notebook is provided in our documentation illustrating interactive | |||
|
22 | use of the system. | |||
20 | """ |
|
23 | """ | |
21 |
|
24 | |||
22 | #***************************************************************************** |
|
25 | #***************************************************************************** | |
@@ -33,7 +36,8 b' import threading' | |||||
33 | from IPython.core.ultratb import AutoFormattedTB |
|
36 | from IPython.core.ultratb import AutoFormattedTB | |
34 | from IPython.utils.warn import warn, error |
|
37 | from IPython.utils.warn import warn, error | |
35 |
|
38 | |||
36 | class BackgroundJobManager: |
|
39 | ||
|
40 | class BackgroundJobManager(object): | |||
37 | """Class to manage a pool of backgrounded threaded jobs. |
|
41 | """Class to manage a pool of backgrounded threaded jobs. | |
38 |
|
42 | |||
39 | Below, we assume that 'jobs' is a BackgroundJobManager instance. |
|
43 | Below, we assume that 'jobs' is a BackgroundJobManager instance. | |
@@ -52,7 +56,7 b' class BackgroundJobManager:' | |||||
52 |
|
56 | |||
53 | jobs.remove(N) -> remove (finished) job N |
|
57 | jobs.remove(N) -> remove (finished) job N | |
54 |
|
58 | |||
55 |
jobs.flush |
|
59 | jobs.flush() -> remove all finished jobs | |
56 |
|
60 | |||
57 | As a convenience feature, BackgroundJobManager instances provide the |
|
61 | As a convenience feature, BackgroundJobManager instances provide the | |
58 | utility result and traceback methods which retrieve the corresponding |
|
62 | utility result and traceback methods which retrieve the corresponding | |
@@ -63,17 +67,15 b' class BackgroundJobManager:' | |||||
63 |
|
67 | |||
64 | While this appears minor, it allows you to use tab completion |
|
68 | While this appears minor, it allows you to use tab completion | |
65 | interactively on the job manager instance. |
|
69 | interactively on the job manager instance. | |
66 |
|
70 | """ | ||
67 | In interactive mode, IPython provides the magic fuction %bg for quick |
|
|||
68 | creation of backgrounded expression-based jobs. Type bg? for details.""" |
|
|||
69 |
|
71 | |||
70 | def __init__(self): |
|
72 | def __init__(self): | |
71 | # Lists for job management |
|
73 | # Lists for job management | |
72 |
self. |
|
74 | self.running = [] | |
73 |
self. |
|
75 | self.completed = [] | |
74 |
self. |
|
76 | self.dead = [] | |
75 | # A dict of all jobs, so users can easily access any of them |
|
77 | # A dict of all jobs, so users can easily access any of them | |
76 |
self. |
|
78 | self.all = {} | |
77 | # For reporting |
|
79 | # For reporting | |
78 | self._comp_report = [] |
|
80 | self._comp_report = [] | |
79 | self._dead_report = [] |
|
81 | self._dead_report = [] | |
@@ -149,7 +151,7 b' class BackgroundJobManager:' | |||||
149 | if callable(func_or_exp): |
|
151 | if callable(func_or_exp): | |
150 | kw = kwargs.get('kw',{}) |
|
152 | kw = kwargs.get('kw',{}) | |
151 | job = BackgroundJobFunc(func_or_exp,*args,**kw) |
|
153 | job = BackgroundJobFunc(func_or_exp,*args,**kw) | |
152 | elif isinstance(func_or_exp,basestring): |
|
154 | elif isinstance(func_or_exp, basestring): | |
153 | if not args: |
|
155 | if not args: | |
154 | frame = sys._getframe(1) |
|
156 | frame = sys._getframe(1) | |
155 | glob, loc = frame.f_globals, frame.f_locals |
|
157 | glob, loc = frame.f_globals, frame.f_locals | |
@@ -158,30 +160,28 b' class BackgroundJobManager:' | |||||
158 | elif len(args)==2: |
|
160 | elif len(args)==2: | |
159 | glob,loc = args |
|
161 | glob,loc = args | |
160 | else: |
|
162 | else: | |
161 |
raise ValueError |
|
163 | raise ValueError( | |
162 | 'Expression jobs take at most 2 args (globals,locals)' |
|
164 | 'Expression jobs take at most 2 args (globals,locals)') | |
163 | job = BackgroundJobExpr(func_or_exp,glob,loc) |
|
165 | job = BackgroundJobExpr(func_or_exp, glob, loc) | |
164 | else: |
|
|||
165 | raise |
|
|||
166 | jkeys = self.jobs_all.keys() |
|
|||
167 | if jkeys: |
|
|||
168 | job.num = max(jkeys)+1 |
|
|||
169 | else: |
|
166 | else: | |
170 | job.num = 0 |
|
167 | raise TypeError('invalid args for new job') | |
171 | self.jobs_run.append(job) |
|
168 | ||
172 | self.jobs_all[job.num] = job |
|
169 | job.num = len(self.all)+1 if self.all else 0 | |
|
170 | self.running.append(job) | |||
|
171 | self.all[job.num] = job | |||
173 | print 'Starting job # %s in a separate thread.' % job.num |
|
172 | print 'Starting job # %s in a separate thread.' % job.num | |
174 | job.start() |
|
173 | job.start() | |
175 | return job |
|
174 | return job | |
176 |
|
175 | |||
177 | def __getitem__(self,key): |
|
176 | def __getitem__(self, job_key): | |
178 | return self.jobs_all[key] |
|
177 | num = job_key if isinstance(job_key, int) else job_key.num | |
|
178 | return self.all[num] | |||
179 |
|
179 | |||
180 | def __call__(self): |
|
180 | def __call__(self): | |
181 | """An alias to self.status(), |
|
181 | """An alias to self.status(), | |
182 |
|
182 | |||
183 | This allows you to simply call a job manager instance much like the |
|
183 | This allows you to simply call a job manager instance much like the | |
184 | Unix jobs shell command.""" |
|
184 | Unix `jobs` shell command.""" | |
185 |
|
185 | |||
186 | return self.status() |
|
186 | return self.status() | |
187 |
|
187 | |||
@@ -189,29 +189,29 b' class BackgroundJobManager:' | |||||
189 | """Update the status of the job lists. |
|
189 | """Update the status of the job lists. | |
190 |
|
190 | |||
191 | This method moves finished jobs to one of two lists: |
|
191 | This method moves finished jobs to one of two lists: | |
192 |
- self. |
|
192 | - self.completed: jobs which completed successfully | |
193 |
- self. |
|
193 | - self.dead: jobs which finished but died. | |
194 |
|
194 | |||
195 | It also copies those jobs to corresponding _report lists. These lists |
|
195 | It also copies those jobs to corresponding _report lists. These lists | |
196 | are used to report jobs completed/dead since the last update, and are |
|
196 | are used to report jobs completed/dead since the last update, and are | |
197 | then cleared by the reporting function after each call.""" |
|
197 | then cleared by the reporting function after each call.""" | |
198 |
|
198 | |||
199 | run,comp,dead = self._s_running,self._s_completed,self._s_dead |
|
199 | run,comp,dead = self._s_running,self._s_completed,self._s_dead | |
200 |
|
|
200 | running = self.running | |
201 |
for num in range(len( |
|
201 | for num in range(len(running)): | |
202 |
job = |
|
202 | job = running[num] | |
203 | stat = job.stat_code |
|
203 | stat = job.stat_code | |
204 | if stat == run: |
|
204 | if stat == run: | |
205 | continue |
|
205 | continue | |
206 | elif stat == comp: |
|
206 | elif stat == comp: | |
207 |
self. |
|
207 | self.completed.append(job) | |
208 | self._comp_report.append(job) |
|
208 | self._comp_report.append(job) | |
209 |
|
|
209 | running[num] = False | |
210 | elif stat == dead: |
|
210 | elif stat == dead: | |
211 |
self. |
|
211 | self.dead.append(job) | |
212 | self._dead_report.append(job) |
|
212 | self._dead_report.append(job) | |
213 |
|
|
213 | running[num] = False | |
214 |
self. |
|
214 | self.running = filter(None,self.running) | |
215 |
|
215 | |||
216 | def _group_report(self,group,name): |
|
216 | def _group_report(self,group,name): | |
217 | """Report summary for a given job group. |
|
217 | """Report summary for a given job group. | |
@@ -246,7 +246,7 b' class BackgroundJobManager:' | |||||
246 | which have finished since the last time it was called.""" |
|
246 | which have finished since the last time it was called.""" | |
247 |
|
247 | |||
248 | self._update_status() |
|
248 | self._update_status() | |
249 | new_comp = self._group_report(self._comp_report,'Completed') |
|
249 | new_comp = self._group_report(self._comp_report, 'Completed') | |
250 | new_dead = self._group_report(self._dead_report, |
|
250 | new_dead = self._group_report(self._dead_report, | |
251 | 'Dead, call jobs.traceback() for details') |
|
251 | 'Dead, call jobs.traceback() for details') | |
252 | self._comp_report[:] = [] |
|
252 | self._comp_report[:] = [] | |
@@ -257,9 +257,9 b' class BackgroundJobManager:' | |||||
257 | """Print a status of all jobs currently being managed.""" |
|
257 | """Print a status of all jobs currently being managed.""" | |
258 |
|
258 | |||
259 | self._update_status() |
|
259 | self._update_status() | |
260 |
self._group_report(self. |
|
260 | self._group_report(self.running,'Running') | |
261 |
self._group_report(self. |
|
261 | self._group_report(self.completed,'Completed') | |
262 |
self._group_report(self. |
|
262 | self._group_report(self.dead,'Dead') | |
263 | # Also flush the report queues |
|
263 | # Also flush the report queues | |
264 | self._comp_report[:] = [] |
|
264 | self._comp_report[:] = [] | |
265 | self._dead_report[:] = [] |
|
265 | self._dead_report[:] = [] | |
@@ -268,7 +268,7 b' class BackgroundJobManager:' | |||||
268 | """Remove a finished (completed or dead) job.""" |
|
268 | """Remove a finished (completed or dead) job.""" | |
269 |
|
269 | |||
270 | try: |
|
270 | try: | |
271 |
job = self. |
|
271 | job = self.all[num] | |
272 | except KeyError: |
|
272 | except KeyError: | |
273 | error('Job #%s not found' % num) |
|
273 | error('Job #%s not found' % num) | |
274 | else: |
|
274 | else: | |
@@ -277,12 +277,12 b' class BackgroundJobManager:' | |||||
277 | error('Job #%s is still running, it can not be removed.' % num) |
|
277 | error('Job #%s is still running, it can not be removed.' % num) | |
278 | return |
|
278 | return | |
279 | elif stat_code == self._s_completed: |
|
279 | elif stat_code == self._s_completed: | |
280 |
self. |
|
280 | self.completed.remove(job) | |
281 | elif stat_code == self._s_dead: |
|
281 | elif stat_code == self._s_dead: | |
282 |
self. |
|
282 | self.dead.remove(job) | |
283 |
|
283 | |||
284 |
def flush |
|
284 | def flush(self): | |
285 |
"""Flush all |
|
285 | """Flush all finished jobs (completed and dead) from lists. | |
286 |
|
286 | |||
287 | Running jobs are never flushed. |
|
287 | Running jobs are never flushed. | |
288 |
|
288 | |||
@@ -296,29 +296,40 b' class BackgroundJobManager:' | |||||
296 | return |
|
296 | return | |
297 |
|
297 | |||
298 | # Remove the finished jobs from the master dict |
|
298 | # Remove the finished jobs from the master dict | |
299 |
|
|
299 | all = self.all | |
300 |
for job in self. |
|
300 | for job in self.completed+self.dead: | |
301 |
del( |
|
301 | del(all[job.num]) | |
302 |
|
302 | |||
303 | # Now flush these lists completely |
|
303 | # Now flush these lists completely | |
304 |
fl_comp = self._group_flush(self. |
|
304 | fl_comp = self._group_flush(self.completed, 'Completed') | |
305 |
fl_dead = self._group_flush(self. |
|
305 | fl_dead = self._group_flush(self.dead, 'Dead') | |
306 | if not (fl_comp or fl_dead): |
|
306 | if not (fl_comp or fl_dead): | |
307 | print 'No jobs to flush.' |
|
307 | print 'No jobs to flush.' | |
308 |
|
308 | |||
309 | def result(self,num): |
|
309 | def result(self,num): | |
310 | """result(N) -> return the result of job N.""" |
|
310 | """result(N) -> return the result of job N.""" | |
311 | try: |
|
311 | try: | |
312 |
return self. |
|
312 | return self.all[num].result | |
313 | except KeyError: |
|
313 | except KeyError: | |
314 | error('Job #%s not found' % num) |
|
314 | error('Job #%s not found' % num) | |
315 |
|
315 | |||
316 |
def traceback(self, |
|
316 | def _traceback(self, job): | |
|
317 | num = job if isinstance(job, int) else job.num | |||
317 | try: |
|
318 | try: | |
318 |
self. |
|
319 | self.all[num].traceback() | |
319 | except KeyError: |
|
320 | except KeyError: | |
320 | error('Job #%s not found' % num) |
|
321 | error('Job #%s not found' % num) | |
321 |
|
322 | |||
|
323 | def traceback(self, job=None): | |||
|
324 | if job is None: | |||
|
325 | self._update_status() | |||
|
326 | for deadjob in self.dead: | |||
|
327 | print "Traceback for: %r" % deadjob | |||
|
328 | self._traceback(deadjob) | |||
|
329 | ||||
|
330 | else: | |||
|
331 | self._traceback(job) | |||
|
332 | ||||
322 |
|
333 | |||
323 | class BackgroundJobBase(threading.Thread): |
|
334 | class BackgroundJobBase(threading.Thread): | |
324 | """Base class to build BackgroundJob classes. |
|
335 | """Base class to build BackgroundJob classes. | |
@@ -360,6 +371,7 b' class BackgroundJobBase(threading.Thread):' | |||||
360 | self.stat_code = BackgroundJobBase.stat_created_c |
|
371 | self.stat_code = BackgroundJobBase.stat_created_c | |
361 | self.finished = False |
|
372 | self.finished = False | |
362 | self.result = '<BackgroundJob has not completed>' |
|
373 | self.result = '<BackgroundJob has not completed>' | |
|
374 | ||||
363 | # reuse the ipython traceback handler if we can get to it, otherwise |
|
375 | # reuse the ipython traceback handler if we can get to it, otherwise | |
364 | # make a new one |
|
376 | # make a new one | |
365 | try: |
|
377 | try: | |
@@ -368,7 +380,10 b' class BackgroundJobBase(threading.Thread):' | |||||
368 | make_tb = AutoFormattedTB(mode = 'Context', |
|
380 | make_tb = AutoFormattedTB(mode = 'Context', | |
369 | color_scheme='NoColor', |
|
381 | color_scheme='NoColor', | |
370 | tb_offset = 1).text |
|
382 | tb_offset = 1).text | |
|
383 | # Note that the actual API for text() requires the three args to be | |||
|
384 | # passed in, so we wrap it in a simple lambda. | |||
371 | self._make_tb = lambda : make_tb(None, None, None) |
|
385 | self._make_tb = lambda : make_tb(None, None, None) | |
|
386 | ||||
372 | # Hold a formatted traceback if one is generated. |
|
387 | # Hold a formatted traceback if one is generated. | |
373 | self._tb = None |
|
388 | self._tb = None | |
374 |
|
389 | |||
@@ -378,7 +393,7 b' class BackgroundJobBase(threading.Thread):' | |||||
378 | return self.strform |
|
393 | return self.strform | |
379 |
|
394 | |||
380 | def __repr__(self): |
|
395 | def __repr__(self): | |
381 | return '<BackgroundJob: %s>' % self.strform |
|
396 | return '<BackgroundJob #%d: %s>' % (self.num, self.strform) | |
382 |
|
397 | |||
383 | def traceback(self): |
|
398 | def traceback(self): | |
384 | print self._tb |
|
399 | print self._tb | |
@@ -399,10 +414,11 b' class BackgroundJobBase(threading.Thread):' | |||||
399 | self.stat_code = BackgroundJobBase.stat_completed_c |
|
414 | self.stat_code = BackgroundJobBase.stat_completed_c | |
400 | self.finished = True |
|
415 | self.finished = True | |
401 |
|
416 | |||
|
417 | ||||
402 | class BackgroundJobExpr(BackgroundJobBase): |
|
418 | class BackgroundJobExpr(BackgroundJobBase): | |
403 | """Evaluate an expression as a background job (uses a separate thread).""" |
|
419 | """Evaluate an expression as a background job (uses a separate thread).""" | |
404 |
|
420 | |||
405 | def __init__(self,expression,glob=None,loc=None): |
|
421 | def __init__(self, expression, glob=None, loc=None): | |
406 | """Create a new job from a string which can be fed to eval(). |
|
422 | """Create a new job from a string which can be fed to eval(). | |
407 |
|
423 | |||
408 | global/locals dicts can be provided, which will be passed to the eval |
|
424 | global/locals dicts can be provided, which will be passed to the eval | |
@@ -411,11 +427,8 b' class BackgroundJobExpr(BackgroundJobBase):' | |||||
411 | # fail immediately if the given expression can't be compiled |
|
427 | # fail immediately if the given expression can't be compiled | |
412 | self.code = compile(expression,'<BackgroundJob compilation>','eval') |
|
428 | self.code = compile(expression,'<BackgroundJob compilation>','eval') | |
413 |
|
429 | |||
414 |
if glob is None |
|
430 | glob = {} if glob is None else glob | |
415 | glob = {} |
|
431 | loc = {} if loc is None else loc | |
416 | if loc is None: |
|
|||
417 | loc = {} |
|
|||
418 |
|
||||
419 | self.expression = self.strform = expression |
|
432 | self.expression = self.strform = expression | |
420 | self.glob = glob |
|
433 | self.glob = glob | |
421 | self.loc = loc |
|
434 | self.loc = loc | |
@@ -424,21 +437,19 b' class BackgroundJobExpr(BackgroundJobBase):' | |||||
424 | def call(self): |
|
437 | def call(self): | |
425 | return eval(self.code,self.glob,self.loc) |
|
438 | return eval(self.code,self.glob,self.loc) | |
426 |
|
439 | |||
|
440 | ||||
427 | class BackgroundJobFunc(BackgroundJobBase): |
|
441 | class BackgroundJobFunc(BackgroundJobBase): | |
428 | """Run a function call as a background job (uses a separate thread).""" |
|
442 | """Run a function call as a background job (uses a separate thread).""" | |
429 |
|
443 | |||
430 | def __init__(self,func,*args,**kwargs): |
|
444 | def __init__(self, func, *args, **kwargs): | |
431 | """Create a new job from a callable object. |
|
445 | """Create a new job from a callable object. | |
432 |
|
446 | |||
433 | Any positional arguments and keyword args given to this constructor |
|
447 | Any positional arguments and keyword args given to this constructor | |
434 | after the initial callable are passed directly to it.""" |
|
448 | after the initial callable are passed directly to it.""" | |
435 |
|
449 | |||
436 | assert callable(func),'first argument must be callable' |
|
450 | if not callable(func): | |
437 |
|
451 | raise TypeError( | ||
438 | if args is None: |
|
452 | 'first argument to BackgroundJobFunc must be callable') | |
439 | args = [] |
|
|||
440 | if kwargs is None: |
|
|||
441 | kwargs = {} |
|
|||
442 |
|
453 | |||
443 | self.func = func |
|
454 | self.func = func | |
444 | self.args = args |
|
455 | self.args = args | |
@@ -450,44 +461,4 b' class BackgroundJobFunc(BackgroundJobBase):' | |||||
450 | self._init() |
|
461 | self._init() | |
451 |
|
462 | |||
452 | def call(self): |
|
463 | def call(self): | |
453 | return self.func(*self.args,**self.kwargs) |
|
464 | return self.func(*self.args, **self.kwargs) | |
454 |
|
||||
455 |
|
||||
456 | if __name__=='__main__': |
|
|||
457 |
|
||||
458 | import sys |
|
|||
459 | import time |
|
|||
460 |
|
||||
461 | def sleepfunc(interval=2,*a,**kw): |
|
|||
462 | args = dict(interval=interval, |
|
|||
463 | args=a, |
|
|||
464 | kwargs=kw) |
|
|||
465 | time.sleep(interval) |
|
|||
466 | return args |
|
|||
467 |
|
||||
468 | def diefunc(interval=2,*a,**kw): |
|
|||
469 | time.sleep(interval) |
|
|||
470 | die |
|
|||
471 |
|
||||
472 | def printfunc(interval=1,reps=5): |
|
|||
473 | for n in range(reps): |
|
|||
474 | time.sleep(interval) |
|
|||
475 | print 'In the background...', n |
|
|||
476 | sys.stdout.flush() |
|
|||
477 |
|
||||
478 | jobs = BackgroundJobManager() |
|
|||
479 | # first job will have # 0 |
|
|||
480 | jobs.new(sleepfunc,4) |
|
|||
481 | jobs.new(sleepfunc,kw={'reps':2}) |
|
|||
482 | # This makes a job which will die |
|
|||
483 | jobs.new(diefunc,1) |
|
|||
484 | jobs.new('printfunc(1,3)') |
|
|||
485 |
|
||||
486 | # after a while, you can get the traceback of a dead job. Run the line |
|
|||
487 | # below again interactively until it prints a traceback (check the status |
|
|||
488 | # of the job): |
|
|||
489 | print jobs[1].status |
|
|||
490 | jobs[1].traceback() |
|
|||
491 |
|
||||
492 | # Run this line again until the printed result changes |
|
|||
493 | print "The result of job #0 is:",jobs[0].result |
|
General Comments 0
You need to be logged in to leave comments.
Login now