##// END OF EJS Templates
Cleanup the API of backgroundjobs to be more convenient to use....
Fernando Perez -
Show More
@@ -17,6 +17,9 b' http://folk.uio.no/hpl/scripting'
17
17
18 (although ultimately no code from this text was used, as IPython's system is a
18 (although ultimately no code from this text was used, as IPython's system is a
19 separate implementation).
19 separate implementation).
20
21 An example notebook is provided in our documentation illustrating interactive
22 use of the system.
20 """
23 """
21
24
22 #*****************************************************************************
25 #*****************************************************************************
@@ -33,7 +36,8 b' import threading'
33 from IPython.core.ultratb import AutoFormattedTB
36 from IPython.core.ultratb import AutoFormattedTB
34 from IPython.utils.warn import warn, error
37 from IPython.utils.warn import warn, error
35
38
36 class BackgroundJobManager:
39
40 class BackgroundJobManager(object):
37 """Class to manage a pool of backgrounded threaded jobs.
41 """Class to manage a pool of backgrounded threaded jobs.
38
42
39 Below, we assume that 'jobs' is a BackgroundJobManager instance.
43 Below, we assume that 'jobs' is a BackgroundJobManager instance.
@@ -52,7 +56,7 b' class BackgroundJobManager:'
52
56
53 jobs.remove(N) -> remove (finished) job N
57 jobs.remove(N) -> remove (finished) job N
54
58
55 jobs.flush_finished() -> remove all finished jobs
59 jobs.flush() -> remove all finished jobs
56
60
57 As a convenience feature, BackgroundJobManager instances provide the
61 As a convenience feature, BackgroundJobManager instances provide the
58 utility result and traceback methods which retrieve the corresponding
62 utility result and traceback methods which retrieve the corresponding
@@ -63,17 +67,15 b' class BackgroundJobManager:'
63
67
64 While this appears minor, it allows you to use tab completion
68 While this appears minor, it allows you to use tab completion
65 interactively on the job manager instance.
69 interactively on the job manager instance.
66
70 """
67 In interactive mode, IPython provides the magic fuction %bg for quick
68 creation of backgrounded expression-based jobs. Type bg? for details."""
69
71
70 def __init__(self):
72 def __init__(self):
71 # Lists for job management
73 # Lists for job management
72 self.jobs_run = []
74 self.running = []
73 self.jobs_comp = []
75 self.completed = []
74 self.jobs_dead = []
76 self.dead = []
75 # A dict of all jobs, so users can easily access any of them
77 # A dict of all jobs, so users can easily access any of them
76 self.jobs_all = {}
78 self.all = {}
77 # For reporting
79 # For reporting
78 self._comp_report = []
80 self._comp_report = []
79 self._dead_report = []
81 self._dead_report = []
@@ -149,7 +151,7 b' class BackgroundJobManager:'
149 if callable(func_or_exp):
151 if callable(func_or_exp):
150 kw = kwargs.get('kw',{})
152 kw = kwargs.get('kw',{})
151 job = BackgroundJobFunc(func_or_exp,*args,**kw)
153 job = BackgroundJobFunc(func_or_exp,*args,**kw)
152 elif isinstance(func_or_exp,basestring):
154 elif isinstance(func_or_exp, basestring):
153 if not args:
155 if not args:
154 frame = sys._getframe(1)
156 frame = sys._getframe(1)
155 glob, loc = frame.f_globals, frame.f_locals
157 glob, loc = frame.f_globals, frame.f_locals
@@ -158,30 +160,28 b' class BackgroundJobManager:'
158 elif len(args)==2:
160 elif len(args)==2:
159 glob,loc = args
161 glob,loc = args
160 else:
162 else:
161 raise ValueError,\
163 raise ValueError(
162 'Expression jobs take at most 2 args (globals,locals)'
164 'Expression jobs take at most 2 args (globals,locals)')
163 job = BackgroundJobExpr(func_or_exp,glob,loc)
165 job = BackgroundJobExpr(func_or_exp, glob, loc)
164 else:
165 raise
166 jkeys = self.jobs_all.keys()
167 if jkeys:
168 job.num = max(jkeys)+1
169 else:
166 else:
170 job.num = 0
167 raise TypeError('invalid args for new job')
171 self.jobs_run.append(job)
168
172 self.jobs_all[job.num] = job
169 job.num = len(self.all)+1 if self.all else 0
170 self.running.append(job)
171 self.all[job.num] = job
173 print 'Starting job # %s in a separate thread.' % job.num
172 print 'Starting job # %s in a separate thread.' % job.num
174 job.start()
173 job.start()
175 return job
174 return job
176
175
177 def __getitem__(self,key):
176 def __getitem__(self, job_key):
178 return self.jobs_all[key]
177 num = job_key if isinstance(job_key, int) else job_key.num
178 return self.all[num]
179
179
180 def __call__(self):
180 def __call__(self):
181 """An alias to self.status(),
181 """An alias to self.status(),
182
182
183 This allows you to simply call a job manager instance much like the
183 This allows you to simply call a job manager instance much like the
184 Unix jobs shell command."""
184 Unix `jobs` shell command."""
185
185
186 return self.status()
186 return self.status()
187
187
@@ -189,29 +189,29 b' class BackgroundJobManager:'
189 """Update the status of the job lists.
189 """Update the status of the job lists.
190
190
191 This method moves finished jobs to one of two lists:
191 This method moves finished jobs to one of two lists:
192 - self.jobs_comp: jobs which completed successfully
192 - self.completed: jobs which completed successfully
193 - self.jobs_dead: jobs which finished but died.
193 - self.dead: jobs which finished but died.
194
194
195 It also copies those jobs to corresponding _report lists. These lists
195 It also copies those jobs to corresponding _report lists. These lists
196 are used to report jobs completed/dead since the last update, and are
196 are used to report jobs completed/dead since the last update, and are
197 then cleared by the reporting function after each call."""
197 then cleared by the reporting function after each call."""
198
198
199 run,comp,dead = self._s_running,self._s_completed,self._s_dead
199 run,comp,dead = self._s_running,self._s_completed,self._s_dead
200 jobs_run = self.jobs_run
200 running = self.running
201 for num in range(len(jobs_run)):
201 for num in range(len(running)):
202 job = jobs_run[num]
202 job = running[num]
203 stat = job.stat_code
203 stat = job.stat_code
204 if stat == run:
204 if stat == run:
205 continue
205 continue
206 elif stat == comp:
206 elif stat == comp:
207 self.jobs_comp.append(job)
207 self.completed.append(job)
208 self._comp_report.append(job)
208 self._comp_report.append(job)
209 jobs_run[num] = False
209 running[num] = False
210 elif stat == dead:
210 elif stat == dead:
211 self.jobs_dead.append(job)
211 self.dead.append(job)
212 self._dead_report.append(job)
212 self._dead_report.append(job)
213 jobs_run[num] = False
213 running[num] = False
214 self.jobs_run = filter(None,self.jobs_run)
214 self.running = filter(None,self.running)
215
215
216 def _group_report(self,group,name):
216 def _group_report(self,group,name):
217 """Report summary for a given job group.
217 """Report summary for a given job group.
@@ -246,7 +246,7 b' class BackgroundJobManager:'
246 which have finished since the last time it was called."""
246 which have finished since the last time it was called."""
247
247
248 self._update_status()
248 self._update_status()
249 new_comp = self._group_report(self._comp_report,'Completed')
249 new_comp = self._group_report(self._comp_report, 'Completed')
250 new_dead = self._group_report(self._dead_report,
250 new_dead = self._group_report(self._dead_report,
251 'Dead, call jobs.traceback() for details')
251 'Dead, call jobs.traceback() for details')
252 self._comp_report[:] = []
252 self._comp_report[:] = []
@@ -257,9 +257,9 b' class BackgroundJobManager:'
257 """Print a status of all jobs currently being managed."""
257 """Print a status of all jobs currently being managed."""
258
258
259 self._update_status()
259 self._update_status()
260 self._group_report(self.jobs_run,'Running')
260 self._group_report(self.running,'Running')
261 self._group_report(self.jobs_comp,'Completed')
261 self._group_report(self.completed,'Completed')
262 self._group_report(self.jobs_dead,'Dead')
262 self._group_report(self.dead,'Dead')
263 # Also flush the report queues
263 # Also flush the report queues
264 self._comp_report[:] = []
264 self._comp_report[:] = []
265 self._dead_report[:] = []
265 self._dead_report[:] = []
@@ -268,7 +268,7 b' class BackgroundJobManager:'
268 """Remove a finished (completed or dead) job."""
268 """Remove a finished (completed or dead) job."""
269
269
270 try:
270 try:
271 job = self.jobs_all[num]
271 job = self.all[num]
272 except KeyError:
272 except KeyError:
273 error('Job #%s not found' % num)
273 error('Job #%s not found' % num)
274 else:
274 else:
@@ -277,12 +277,12 b' class BackgroundJobManager:'
277 error('Job #%s is still running, it can not be removed.' % num)
277 error('Job #%s is still running, it can not be removed.' % num)
278 return
278 return
279 elif stat_code == self._s_completed:
279 elif stat_code == self._s_completed:
280 self.jobs_comp.remove(job)
280 self.completed.remove(job)
281 elif stat_code == self._s_dead:
281 elif stat_code == self._s_dead:
282 self.jobs_dead.remove(job)
282 self.dead.remove(job)
283
283
284 def flush_finished(self):
284 def flush(self):
285 """Flush all jobs finished (completed and dead) from lists.
285 """Flush all finished jobs (completed and dead) from lists.
286
286
287 Running jobs are never flushed.
287 Running jobs are never flushed.
288
288
@@ -296,29 +296,40 b' class BackgroundJobManager:'
296 return
296 return
297
297
298 # Remove the finished jobs from the master dict
298 # Remove the finished jobs from the master dict
299 jobs_all = self.jobs_all
299 all = self.all
300 for job in self.jobs_comp+self.jobs_dead:
300 for job in self.completed+self.dead:
301 del(jobs_all[job.num])
301 del(all[job.num])
302
302
303 # Now flush these lists completely
303 # Now flush these lists completely
304 fl_comp = self._group_flush(self.jobs_comp,'Completed')
304 fl_comp = self._group_flush(self.completed, 'Completed')
305 fl_dead = self._group_flush(self.jobs_dead,'Dead')
305 fl_dead = self._group_flush(self.dead, 'Dead')
306 if not (fl_comp or fl_dead):
306 if not (fl_comp or fl_dead):
307 print 'No jobs to flush.'
307 print 'No jobs to flush.'
308
308
309 def result(self,num):
309 def result(self,num):
310 """result(N) -> return the result of job N."""
310 """result(N) -> return the result of job N."""
311 try:
311 try:
312 return self.jobs_all[num].result
312 return self.all[num].result
313 except KeyError:
313 except KeyError:
314 error('Job #%s not found' % num)
314 error('Job #%s not found' % num)
315
315
316 def traceback(self,num):
316 def _traceback(self, job):
317 num = job if isinstance(job, int) else job.num
317 try:
318 try:
318 self.jobs_all[num].traceback()
319 self.all[num].traceback()
319 except KeyError:
320 except KeyError:
320 error('Job #%s not found' % num)
321 error('Job #%s not found' % num)
321
322
323 def traceback(self, job=None):
324 if job is None:
325 self._update_status()
326 for deadjob in self.dead:
327 print "Traceback for: %r" % deadjob
328 self._traceback(deadjob)
329 print
330 else:
331 self._traceback(job)
332
322
333
323 class BackgroundJobBase(threading.Thread):
334 class BackgroundJobBase(threading.Thread):
324 """Base class to build BackgroundJob classes.
335 """Base class to build BackgroundJob classes.
@@ -360,6 +371,7 b' class BackgroundJobBase(threading.Thread):'
360 self.stat_code = BackgroundJobBase.stat_created_c
371 self.stat_code = BackgroundJobBase.stat_created_c
361 self.finished = False
372 self.finished = False
362 self.result = '<BackgroundJob has not completed>'
373 self.result = '<BackgroundJob has not completed>'
374
363 # reuse the ipython traceback handler if we can get to it, otherwise
375 # reuse the ipython traceback handler if we can get to it, otherwise
364 # make a new one
376 # make a new one
365 try:
377 try:
@@ -368,7 +380,10 b' class BackgroundJobBase(threading.Thread):'
368 make_tb = AutoFormattedTB(mode = 'Context',
380 make_tb = AutoFormattedTB(mode = 'Context',
369 color_scheme='NoColor',
381 color_scheme='NoColor',
370 tb_offset = 1).text
382 tb_offset = 1).text
383 # Note that the actual API for text() requires the three args to be
384 # passed in, so we wrap it in a simple lambda.
371 self._make_tb = lambda : make_tb(None, None, None)
385 self._make_tb = lambda : make_tb(None, None, None)
386
372 # Hold a formatted traceback if one is generated.
387 # Hold a formatted traceback if one is generated.
373 self._tb = None
388 self._tb = None
374
389
@@ -378,7 +393,7 b' class BackgroundJobBase(threading.Thread):'
378 return self.strform
393 return self.strform
379
394
380 def __repr__(self):
395 def __repr__(self):
381 return '<BackgroundJob: %s>' % self.strform
396 return '<BackgroundJob #%d: %s>' % (self.num, self.strform)
382
397
383 def traceback(self):
398 def traceback(self):
384 print self._tb
399 print self._tb
@@ -399,10 +414,11 b' class BackgroundJobBase(threading.Thread):'
399 self.stat_code = BackgroundJobBase.stat_completed_c
414 self.stat_code = BackgroundJobBase.stat_completed_c
400 self.finished = True
415 self.finished = True
401
416
417
402 class BackgroundJobExpr(BackgroundJobBase):
418 class BackgroundJobExpr(BackgroundJobBase):
403 """Evaluate an expression as a background job (uses a separate thread)."""
419 """Evaluate an expression as a background job (uses a separate thread)."""
404
420
405 def __init__(self,expression,glob=None,loc=None):
421 def __init__(self, expression, glob=None, loc=None):
406 """Create a new job from a string which can be fed to eval().
422 """Create a new job from a string which can be fed to eval().
407
423
408 global/locals dicts can be provided, which will be passed to the eval
424 global/locals dicts can be provided, which will be passed to the eval
@@ -411,11 +427,8 b' class BackgroundJobExpr(BackgroundJobBase):'
411 # fail immediately if the given expression can't be compiled
427 # fail immediately if the given expression can't be compiled
412 self.code = compile(expression,'<BackgroundJob compilation>','eval')
428 self.code = compile(expression,'<BackgroundJob compilation>','eval')
413
429
414 if glob is None:
430 glob = {} if glob is None else glob
415 glob = {}
431 loc = {} if loc is None else loc
416 if loc is None:
417 loc = {}
418
419 self.expression = self.strform = expression
432 self.expression = self.strform = expression
420 self.glob = glob
433 self.glob = glob
421 self.loc = loc
434 self.loc = loc
@@ -424,21 +437,19 b' class BackgroundJobExpr(BackgroundJobBase):'
424 def call(self):
437 def call(self):
425 return eval(self.code,self.glob,self.loc)
438 return eval(self.code,self.glob,self.loc)
426
439
440
427 class BackgroundJobFunc(BackgroundJobBase):
441 class BackgroundJobFunc(BackgroundJobBase):
428 """Run a function call as a background job (uses a separate thread)."""
442 """Run a function call as a background job (uses a separate thread)."""
429
443
430 def __init__(self,func,*args,**kwargs):
444 def __init__(self, func, *args, **kwargs):
431 """Create a new job from a callable object.
445 """Create a new job from a callable object.
432
446
433 Any positional arguments and keyword args given to this constructor
447 Any positional arguments and keyword args given to this constructor
434 after the initial callable are passed directly to it."""
448 after the initial callable are passed directly to it."""
435
449
436 assert callable(func),'first argument must be callable'
450 if not callable(func):
437
451 raise TypeError(
438 if args is None:
452 'first argument to BackgroundJobFunc must be callable')
439 args = []
440 if kwargs is None:
441 kwargs = {}
442
453
443 self.func = func
454 self.func = func
444 self.args = args
455 self.args = args
@@ -450,44 +461,4 b' class BackgroundJobFunc(BackgroundJobBase):'
450 self._init()
461 self._init()
451
462
452 def call(self):
463 def call(self):
453 return self.func(*self.args,**self.kwargs)
464 return self.func(*self.args, **self.kwargs)
454
455
456 if __name__=='__main__':
457
458 import sys
459 import time
460
461 def sleepfunc(interval=2,*a,**kw):
462 args = dict(interval=interval,
463 args=a,
464 kwargs=kw)
465 time.sleep(interval)
466 return args
467
468 def diefunc(interval=2,*a,**kw):
469 time.sleep(interval)
470 die
471
472 def printfunc(interval=1,reps=5):
473 for n in range(reps):
474 time.sleep(interval)
475 print 'In the background...', n
476 sys.stdout.flush()
477
478 jobs = BackgroundJobManager()
479 # first job will have # 0
480 jobs.new(sleepfunc,4)
481 jobs.new(sleepfunc,kw={'reps':2})
482 # This makes a job which will die
483 jobs.new(diefunc,1)
484 jobs.new('printfunc(1,3)')
485
486 # after a while, you can get the traceback of a dead job. Run the line
487 # below again interactively until it prints a traceback (check the status
488 # of the job):
489 print jobs[1].status
490 jobs[1].traceback()
491
492 # Run this line again until the printed result changes
493 print "The result of job #0 is:",jobs[0].result
General Comments 0
You need to be logged in to leave comments. Login now