##// END OF EJS Templates
Cleanup the API of backgroundjobs to be more convenient to use....
Fernando Perez -
Show More
@@ -1,493 +1,464 b''
1 1 # -*- coding: utf-8 -*-
2 2 """Manage background (threaded) jobs conveniently from an interactive shell.
3 3
4 4 This module provides a BackgroundJobManager class. This is the main class
5 5 meant for public usage, it implements an object which can create and manage
6 6 new background jobs.
7 7
8 8 It also provides the actual job classes managed by these BackgroundJobManager
9 9 objects, see their docstrings below.
10 10
11 11
12 12 This system was inspired by discussions with B. Granger and the
13 13 BackgroundCommand class described in the book Python Scripting for
14 14 Computational Science, by H. P. Langtangen:
15 15
16 16 http://folk.uio.no/hpl/scripting
17 17
18 18 (although ultimately no code from this text was used, as IPython's system is a
19 19 separate implementation).
20
21 An example notebook is provided in our documentation illustrating interactive
22 use of the system.
20 23 """
21 24
22 25 #*****************************************************************************
23 26 # Copyright (C) 2005-2006 Fernando Perez <fperez@colorado.edu>
24 27 #
25 28 # Distributed under the terms of the BSD License. The full license is in
26 29 # the file COPYING, distributed as part of this software.
27 30 #*****************************************************************************
28 31
29 32 # Code begins
30 33 import sys
31 34 import threading
32 35
33 36 from IPython.core.ultratb import AutoFormattedTB
34 37 from IPython.utils.warn import warn, error
35 38
36 class BackgroundJobManager:
39
40 class BackgroundJobManager(object):
37 41 """Class to manage a pool of backgrounded threaded jobs.
38 42
39 43 Below, we assume that 'jobs' is a BackgroundJobManager instance.
40 44
41 45 Usage summary (see the method docstrings for details):
42 46
43 47 jobs.new(...) -> start a new job
44 48
45 49 jobs() or jobs.status() -> print status summary of all jobs
46 50
47 51 jobs[N] -> returns job number N.
48 52
49 53 foo = jobs[N].result -> assign to variable foo the result of job N
50 54
51 55 jobs[N].traceback() -> print the traceback of dead job N
52 56
53 57 jobs.remove(N) -> remove (finished) job N
54 58
55 jobs.flush_finished() -> remove all finished jobs
59 jobs.flush() -> remove all finished jobs
56 60
57 61 As a convenience feature, BackgroundJobManager instances provide the
58 62 utility result and traceback methods which retrieve the corresponding
59 63 information from the jobs list:
60 64
61 65 jobs.result(N) <--> jobs[N].result
62 66 jobs.traceback(N) <--> jobs[N].traceback()
63 67
64 68 While this appears minor, it allows you to use tab completion
65 69 interactively on the job manager instance.
66
67 In interactive mode, IPython provides the magic fuction %bg for quick
68 creation of backgrounded expression-based jobs. Type bg? for details."""
70 """
69 71
70 72 def __init__(self):
71 73 # Lists for job management
72 self.jobs_run = []
73 self.jobs_comp = []
74 self.jobs_dead = []
74 self.running = []
75 self.completed = []
76 self.dead = []
75 77 # A dict of all jobs, so users can easily access any of them
76 self.jobs_all = {}
78 self.all = {}
77 79 # For reporting
78 80 self._comp_report = []
79 81 self._dead_report = []
80 82 # Store status codes locally for fast lookups
81 83 self._s_created = BackgroundJobBase.stat_created_c
82 84 self._s_running = BackgroundJobBase.stat_running_c
83 85 self._s_completed = BackgroundJobBase.stat_completed_c
84 86 self._s_dead = BackgroundJobBase.stat_dead_c
85 87
86 88 def new(self,func_or_exp,*args,**kwargs):
87 89 """Add a new background job and start it in a separate thread.
88 90
89 91 There are two types of jobs which can be created:
90 92
91 93 1. Jobs based on expressions which can be passed to an eval() call.
92 94 The expression must be given as a string. For example:
93 95
94 96 job_manager.new('myfunc(x,y,z=1)'[,glob[,loc]])
95 97
96 98 The given expression is passed to eval(), along with the optional
97 99 global/local dicts provided. If no dicts are given, they are
98 100 extracted automatically from the caller's frame.
99 101
100 102 A Python statement is NOT a valid eval() expression. Basically, you
101 103 can only use as an eval() argument something which can go on the right
102 104 of an '=' sign and be assigned to a variable.
103 105
104 106 For example,"print 'hello'" is not valid, but '2+3' is.
105 107
106 108 2. Jobs given a function object, optionally passing additional
107 109 positional arguments:
108 110
109 111 job_manager.new(myfunc,x,y)
110 112
111 113 The function is called with the given arguments.
112 114
113 115 If you need to pass keyword arguments to your function, you must
114 116 supply them as a dict named kw:
115 117
116 118 job_manager.new(myfunc,x,y,kw=dict(z=1))
117 119
118 120 The reason for this assymmetry is that the new() method needs to
119 121 maintain access to its own keywords, and this prevents name collisions
120 122 between arguments to new() and arguments to your own functions.
121 123
122 124 In both cases, the result is stored in the job.result field of the
123 125 background job object.
124 126
125 127
126 128 Notes and caveats:
127 129
128 130 1. All threads running share the same standard output. Thus, if your
129 131 background jobs generate output, it will come out on top of whatever
130 132 you are currently writing. For this reason, background jobs are best
131 133 used with silent functions which simply return their output.
132 134
133 135 2. Threads also all work within the same global namespace, and this
134 136 system does not lock interactive variables. So if you send job to the
135 137 background which operates on a mutable object for a long time, and
136 138 start modifying that same mutable object interactively (or in another
137 139 backgrounded job), all sorts of bizarre behaviour will occur.
138 140
139 141 3. If a background job is spending a lot of time inside a C extension
140 142 module which does not release the Python Global Interpreter Lock
141 143 (GIL), this will block the IPython prompt. This is simply because the
142 144 Python interpreter can only switch between threads at Python
143 145 bytecodes. While the execution is inside C code, the interpreter must
144 146 simply wait unless the extension module releases the GIL.
145 147
146 148 4. There is no way, due to limitations in the Python threads library,
147 149 to kill a thread once it has started."""
148 150
149 151 if callable(func_or_exp):
150 152 kw = kwargs.get('kw',{})
151 153 job = BackgroundJobFunc(func_or_exp,*args,**kw)
152 elif isinstance(func_or_exp,basestring):
154 elif isinstance(func_or_exp, basestring):
153 155 if not args:
154 156 frame = sys._getframe(1)
155 157 glob, loc = frame.f_globals, frame.f_locals
156 158 elif len(args)==1:
157 159 glob = loc = args[0]
158 160 elif len(args)==2:
159 161 glob,loc = args
160 162 else:
161 raise ValueError,\
162 'Expression jobs take at most 2 args (globals,locals)'
163 job = BackgroundJobExpr(func_or_exp,glob,loc)
164 else:
165 raise
166 jkeys = self.jobs_all.keys()
167 if jkeys:
168 job.num = max(jkeys)+1
163 raise ValueError(
164 'Expression jobs take at most 2 args (globals,locals)')
165 job = BackgroundJobExpr(func_or_exp, glob, loc)
169 166 else:
170 job.num = 0
171 self.jobs_run.append(job)
172 self.jobs_all[job.num] = job
167 raise TypeError('invalid args for new job')
168
169 job.num = len(self.all)+1 if self.all else 0
170 self.running.append(job)
171 self.all[job.num] = job
173 172 print 'Starting job # %s in a separate thread.' % job.num
174 173 job.start()
175 174 return job
176 175
177 def __getitem__(self,key):
178 return self.jobs_all[key]
176 def __getitem__(self, job_key):
177 num = job_key if isinstance(job_key, int) else job_key.num
178 return self.all[num]
179 179
180 180 def __call__(self):
181 181 """An alias to self.status(),
182 182
183 183 This allows you to simply call a job manager instance much like the
184 Unix jobs shell command."""
184 Unix `jobs` shell command."""
185 185
186 186 return self.status()
187 187
188 188 def _update_status(self):
189 189 """Update the status of the job lists.
190 190
191 191 This method moves finished jobs to one of two lists:
192 - self.jobs_comp: jobs which completed successfully
193 - self.jobs_dead: jobs which finished but died.
192 - self.completed: jobs which completed successfully
193 - self.dead: jobs which finished but died.
194 194
195 195 It also copies those jobs to corresponding _report lists. These lists
196 196 are used to report jobs completed/dead since the last update, and are
197 197 then cleared by the reporting function after each call."""
198 198
199 199 run,comp,dead = self._s_running,self._s_completed,self._s_dead
200 jobs_run = self.jobs_run
201 for num in range(len(jobs_run)):
202 job = jobs_run[num]
200 running = self.running
201 for num in range(len(running)):
202 job = running[num]
203 203 stat = job.stat_code
204 204 if stat == run:
205 205 continue
206 206 elif stat == comp:
207 self.jobs_comp.append(job)
207 self.completed.append(job)
208 208 self._comp_report.append(job)
209 jobs_run[num] = False
209 running[num] = False
210 210 elif stat == dead:
211 self.jobs_dead.append(job)
211 self.dead.append(job)
212 212 self._dead_report.append(job)
213 jobs_run[num] = False
214 self.jobs_run = filter(None,self.jobs_run)
213 running[num] = False
214 self.running = filter(None,self.running)
215 215
216 216 def _group_report(self,group,name):
217 217 """Report summary for a given job group.
218 218
219 219 Return True if the group had any elements."""
220 220
221 221 if group:
222 222 print '%s jobs:' % name
223 223 for job in group:
224 224 print '%s : %s' % (job.num,job)
225 225 print
226 226 return True
227 227
228 228 def _group_flush(self,group,name):
229 229 """Flush a given job group
230 230
231 231 Return True if the group had any elements."""
232 232
233 233 njobs = len(group)
234 234 if njobs:
235 235 plural = {1:''}.setdefault(njobs,'s')
236 236 print 'Flushing %s %s job%s.' % (njobs,name,plural)
237 237 group[:] = []
238 238 return True
239 239
240 240 def _status_new(self):
241 241 """Print the status of newly finished jobs.
242 242
243 243 Return True if any new jobs are reported.
244 244
245 245 This call resets its own state every time, so it only reports jobs
246 246 which have finished since the last time it was called."""
247 247
248 248 self._update_status()
249 new_comp = self._group_report(self._comp_report,'Completed')
249 new_comp = self._group_report(self._comp_report, 'Completed')
250 250 new_dead = self._group_report(self._dead_report,
251 251 'Dead, call jobs.traceback() for details')
252 252 self._comp_report[:] = []
253 253 self._dead_report[:] = []
254 254 return new_comp or new_dead
255 255
256 256 def status(self,verbose=0):
257 257 """Print a status of all jobs currently being managed."""
258 258
259 259 self._update_status()
260 self._group_report(self.jobs_run,'Running')
261 self._group_report(self.jobs_comp,'Completed')
262 self._group_report(self.jobs_dead,'Dead')
260 self._group_report(self.running,'Running')
261 self._group_report(self.completed,'Completed')
262 self._group_report(self.dead,'Dead')
263 263 # Also flush the report queues
264 264 self._comp_report[:] = []
265 265 self._dead_report[:] = []
266 266
267 267 def remove(self,num):
268 268 """Remove a finished (completed or dead) job."""
269 269
270 270 try:
271 job = self.jobs_all[num]
271 job = self.all[num]
272 272 except KeyError:
273 273 error('Job #%s not found' % num)
274 274 else:
275 275 stat_code = job.stat_code
276 276 if stat_code == self._s_running:
277 277 error('Job #%s is still running, it can not be removed.' % num)
278 278 return
279 279 elif stat_code == self._s_completed:
280 self.jobs_comp.remove(job)
280 self.completed.remove(job)
281 281 elif stat_code == self._s_dead:
282 self.jobs_dead.remove(job)
282 self.dead.remove(job)
283 283
284 def flush_finished(self):
285 """Flush all jobs finished (completed and dead) from lists.
284 def flush(self):
285 """Flush all finished jobs (completed and dead) from lists.
286 286
287 287 Running jobs are never flushed.
288 288
289 289 It first calls _status_new(), to update info. If any jobs have
290 290 completed since the last _status_new() call, the flush operation
291 291 aborts."""
292 292
293 293 if self._status_new():
294 294 error('New jobs completed since last '\
295 295 '_status_new(), aborting flush.')
296 296 return
297 297
298 298 # Remove the finished jobs from the master dict
299 jobs_all = self.jobs_all
300 for job in self.jobs_comp+self.jobs_dead:
301 del(jobs_all[job.num])
299 all = self.all
300 for job in self.completed+self.dead:
301 del(all[job.num])
302 302
303 303 # Now flush these lists completely
304 fl_comp = self._group_flush(self.jobs_comp,'Completed')
305 fl_dead = self._group_flush(self.jobs_dead,'Dead')
304 fl_comp = self._group_flush(self.completed, 'Completed')
305 fl_dead = self._group_flush(self.dead, 'Dead')
306 306 if not (fl_comp or fl_dead):
307 307 print 'No jobs to flush.'
308 308
309 309 def result(self,num):
310 310 """result(N) -> return the result of job N."""
311 311 try:
312 return self.jobs_all[num].result
312 return self.all[num].result
313 313 except KeyError:
314 314 error('Job #%s not found' % num)
315 315
316 def traceback(self,num):
316 def _traceback(self, job):
317 num = job if isinstance(job, int) else job.num
317 318 try:
318 self.jobs_all[num].traceback()
319 self.all[num].traceback()
319 320 except KeyError:
320 321 error('Job #%s not found' % num)
321 322
323 def traceback(self, job=None):
324 if job is None:
325 self._update_status()
326 for deadjob in self.dead:
327 print "Traceback for: %r" % deadjob
328 self._traceback(deadjob)
329 print
330 else:
331 self._traceback(job)
332
322 333
323 334 class BackgroundJobBase(threading.Thread):
324 335 """Base class to build BackgroundJob classes.
325 336
326 337 The derived classes must implement:
327 338
328 339 - Their own __init__, since the one here raises NotImplementedError. The
329 340 derived constructor must call self._init() at the end, to provide common
330 341 initialization.
331 342
332 343 - A strform attribute used in calls to __str__.
333 344
334 345 - A call() method, which will make the actual execution call and must
335 346 return a value to be held in the 'result' field of the job object."""
336 347
337 348 # Class constants for status, in string and as numerical codes (when
338 349 # updating jobs lists, we don't want to do string comparisons). This will
339 350 # be done at every user prompt, so it has to be as fast as possible
340 351 stat_created = 'Created'; stat_created_c = 0
341 352 stat_running = 'Running'; stat_running_c = 1
342 353 stat_completed = 'Completed'; stat_completed_c = 2
343 354 stat_dead = 'Dead (Exception), call jobs.traceback() for details'
344 355 stat_dead_c = -1
345 356
346 357 def __init__(self):
347 358 raise NotImplementedError, \
348 359 "This class can not be instantiated directly."
349 360
350 361 def _init(self):
351 362 """Common initialization for all BackgroundJob objects"""
352 363
353 364 for attr in ['call','strform']:
354 365 assert hasattr(self,attr), "Missing attribute <%s>" % attr
355 366
356 367 # The num tag can be set by an external job manager
357 368 self.num = None
358 369
359 370 self.status = BackgroundJobBase.stat_created
360 371 self.stat_code = BackgroundJobBase.stat_created_c
361 372 self.finished = False
362 373 self.result = '<BackgroundJob has not completed>'
374
363 375 # reuse the ipython traceback handler if we can get to it, otherwise
364 376 # make a new one
365 377 try:
366 378 make_tb = get_ipython().InteractiveTB.text
367 379 except:
368 380 make_tb = AutoFormattedTB(mode = 'Context',
369 381 color_scheme='NoColor',
370 382 tb_offset = 1).text
383 # Note that the actual API for text() requires the three args to be
384 # passed in, so we wrap it in a simple lambda.
371 385 self._make_tb = lambda : make_tb(None, None, None)
386
372 387 # Hold a formatted traceback if one is generated.
373 388 self._tb = None
374 389
375 390 threading.Thread.__init__(self)
376 391
377 392 def __str__(self):
378 393 return self.strform
379 394
380 395 def __repr__(self):
381 return '<BackgroundJob: %s>' % self.strform
396 return '<BackgroundJob #%d: %s>' % (self.num, self.strform)
382 397
383 398 def traceback(self):
384 399 print self._tb
385 400
386 401 def run(self):
387 402 try:
388 403 self.status = BackgroundJobBase.stat_running
389 404 self.stat_code = BackgroundJobBase.stat_running_c
390 405 self.result = self.call()
391 406 except:
392 407 self.status = BackgroundJobBase.stat_dead
393 408 self.stat_code = BackgroundJobBase.stat_dead_c
394 409 self.finished = None
395 410 self.result = ('<BackgroundJob died, call jobs.traceback() for details>')
396 411 self._tb = self._make_tb()
397 412 else:
398 413 self.status = BackgroundJobBase.stat_completed
399 414 self.stat_code = BackgroundJobBase.stat_completed_c
400 415 self.finished = True
401 416
417
402 418 class BackgroundJobExpr(BackgroundJobBase):
403 419 """Evaluate an expression as a background job (uses a separate thread)."""
404 420
405 def __init__(self,expression,glob=None,loc=None):
421 def __init__(self, expression, glob=None, loc=None):
406 422 """Create a new job from a string which can be fed to eval().
407 423
408 424 global/locals dicts can be provided, which will be passed to the eval
409 425 call."""
410 426
411 427 # fail immediately if the given expression can't be compiled
412 428 self.code = compile(expression,'<BackgroundJob compilation>','eval')
413 429
414 if glob is None:
415 glob = {}
416 if loc is None:
417 loc = {}
418
430 glob = {} if glob is None else glob
431 loc = {} if loc is None else loc
419 432 self.expression = self.strform = expression
420 433 self.glob = glob
421 434 self.loc = loc
422 435 self._init()
423 436
424 437 def call(self):
425 438 return eval(self.code,self.glob,self.loc)
426 439
440
427 441 class BackgroundJobFunc(BackgroundJobBase):
428 442 """Run a function call as a background job (uses a separate thread)."""
429 443
430 def __init__(self,func,*args,**kwargs):
444 def __init__(self, func, *args, **kwargs):
431 445 """Create a new job from a callable object.
432 446
433 447 Any positional arguments and keyword args given to this constructor
434 448 after the initial callable are passed directly to it."""
435 449
436 assert callable(func),'first argument must be callable'
437
438 if args is None:
439 args = []
440 if kwargs is None:
441 kwargs = {}
450 if not callable(func):
451 raise TypeError(
452 'first argument to BackgroundJobFunc must be callable')
442 453
443 454 self.func = func
444 455 self.args = args
445 456 self.kwargs = kwargs
446 457 # The string form will only include the function passed, because
447 458 # generating string representations of the arguments is a potentially
448 459 # _very_ expensive operation (e.g. with large arrays).
449 460 self.strform = str(func)
450 461 self._init()
451 462
452 463 def call(self):
453 return self.func(*self.args,**self.kwargs)
454
455
456 if __name__=='__main__':
457
458 import sys
459 import time
460
461 def sleepfunc(interval=2,*a,**kw):
462 args = dict(interval=interval,
463 args=a,
464 kwargs=kw)
465 time.sleep(interval)
466 return args
467
468 def diefunc(interval=2,*a,**kw):
469 time.sleep(interval)
470 die
471
472 def printfunc(interval=1,reps=5):
473 for n in range(reps):
474 time.sleep(interval)
475 print 'In the background...', n
476 sys.stdout.flush()
477
478 jobs = BackgroundJobManager()
479 # first job will have # 0
480 jobs.new(sleepfunc,4)
481 jobs.new(sleepfunc,kw={'reps':2})
482 # This makes a job which will die
483 jobs.new(diefunc,1)
484 jobs.new('printfunc(1,3)')
485
486 # after a while, you can get the traceback of a dead job. Run the line
487 # below again interactively until it prints a traceback (check the status
488 # of the job):
489 print jobs[1].status
490 jobs[1].traceback()
491
492 # Run this line again until the printed result changes
493 print "The result of job #0 is:",jobs[0].result
464 return self.func(*self.args, **self.kwargs)
General Comments 0
You need to be logged in to leave comments. Login now