##// END OF EJS Templates
Brian Granger -
Show More
@@ -1,490 +1,490 b''
1 1 # -*- coding: utf-8 -*-
2 2 """Manage background (threaded) jobs conveniently from an interactive shell.
3 3
4 4 This module provides a BackgroundJobManager class. This is the main class
5 5 meant for public usage, it implements an object which can create and manage
6 6 new background jobs.
7 7
8 8 It also provides the actual job classes managed by these BackgroundJobManager
9 9 objects, see their docstrings below.
10 10
11 11
12 12 This system was inspired by discussions with B. Granger and the
13 13 BackgroundCommand class described in the book Python Scripting for
14 14 Computational Science, by H. P. Langtangen:
15 15
16 16 http://folk.uio.no/hpl/scripting
17 17
18 18 (although ultimately no code from this text was used, as IPython's system is a
19 19 separate implementation).
20 20 """
21 21
22 22 #*****************************************************************************
23 23 # Copyright (C) 2005-2006 Fernando Perez <fperez@colorado.edu>
24 24 #
25 25 # Distributed under the terms of the BSD License. The full license is in
26 26 # the file COPYING, distributed as part of this software.
27 27 #*****************************************************************************
28 28
29 29 # Code begins
30 30 import sys
31 31 import threading
32 32
33 33 from IPython.ultraTB import AutoFormattedTB
34 34 from IPython.genutils import warn,error
35 35
36 36 class BackgroundJobManager:
37 37 """Class to manage a pool of backgrounded threaded jobs.
38 38
39 39 Below, we assume that 'jobs' is a BackgroundJobManager instance.
40 40
41 41 Usage summary (see the method docstrings for details):
42 42
43 43 jobs.new(...) -> start a new job
44 44
45 45 jobs() or jobs.status() -> print status summary of all jobs
46 46
47 47 jobs[N] -> returns job number N.
48 48
49 49 foo = jobs[N].result -> assign to variable foo the result of job N
50 50
51 51 jobs[N].traceback() -> print the traceback of dead job N
52 52
53 53 jobs.remove(N) -> remove (finished) job N
54 54
55 55 jobs.flush_finished() -> remove all finished jobs
56 56
57 57 As a convenience feature, BackgroundJobManager instances provide the
58 58 utility result and traceback methods which retrieve the corresponding
59 59 information from the jobs list:
60 60
61 61 jobs.result(N) <--> jobs[N].result
62 62 jobs.traceback(N) <--> jobs[N].traceback()
63 63
64 64 While this appears minor, it allows you to use tab completion
65 65 interactively on the job manager instance.
66 66
67 67 In interactive mode, IPython provides the magic fuction %bg for quick
68 68 creation of backgrounded expression-based jobs. Type bg? for details."""
69 69
70 70 def __init__(self):
71 71 # Lists for job management
72 72 self.jobs_run = []
73 73 self.jobs_comp = []
74 74 self.jobs_dead = []
75 75 # A dict of all jobs, so users can easily access any of them
76 76 self.jobs_all = {}
77 77 # For reporting
78 78 self._comp_report = []
79 79 self._dead_report = []
80 80 # Store status codes locally for fast lookups
81 81 self._s_created = BackgroundJobBase.stat_created_c
82 82 self._s_running = BackgroundJobBase.stat_running_c
83 83 self._s_completed = BackgroundJobBase.stat_completed_c
84 84 self._s_dead = BackgroundJobBase.stat_dead_c
85 85
86 86 def new(self,func_or_exp,*args,**kwargs):
87 87 """Add a new background job and start it in a separate thread.
88 88
89 89 There are two types of jobs which can be created:
90 90
91 91 1. Jobs based on expressions which can be passed to an eval() call.
92 92 The expression must be given as a string. For example:
93 93
94 94 job_manager.new('myfunc(x,y,z=1)'[,glob[,loc]])
95 95
96 96 The given expression is passed to eval(), along with the optional
97 97 global/local dicts provided. If no dicts are given, they are
98 98 extracted automatically from the caller's frame.
99 99
100 100 A Python statement is NOT a valid eval() expression. Basically, you
101 101 can only use as an eval() argument something which can go on the right
102 102 of an '=' sign and be assigned to a variable.
103 103
104 104 For example,"print 'hello'" is not valid, but '2+3' is.
105 105
106 106 2. Jobs given a function object, optionally passing additional
107 107 positional arguments:
108 108
109 109 job_manager.new(myfunc,x,y)
110 110
111 111 The function is called with the given arguments.
112 112
113 113 If you need to pass keyword arguments to your function, you must
114 114 supply them as a dict named kw:
115 115
116 116 job_manager.new(myfunc,x,y,kw=dict(z=1))
117 117
118 118 The reason for this assymmetry is that the new() method needs to
119 119 maintain access to its own keywords, and this prevents name collisions
120 120 between arguments to new() and arguments to your own functions.
121 121
122 122 In both cases, the result is stored in the job.result field of the
123 123 background job object.
124 124
125 125
126 126 Notes and caveats:
127 127
128 128 1. All threads running share the same standard output. Thus, if your
129 129 background jobs generate output, it will come out on top of whatever
130 130 you are currently writing. For this reason, background jobs are best
131 131 used with silent functions which simply return their output.
132 132
133 133 2. Threads also all work within the same global namespace, and this
134 134 system does not lock interactive variables. So if you send job to the
135 135 background which operates on a mutable object for a long time, and
136 136 start modifying that same mutable object interactively (or in another
137 137 backgrounded job), all sorts of bizarre behaviour will occur.
138 138
139 139 3. If a background job is spending a lot of time inside a C extension
140 140 module which does not release the Python Global Interpreter Lock
141 141 (GIL), this will block the IPython prompt. This is simply because the
142 142 Python interpreter can only switch between threads at Python
143 143 bytecodes. While the execution is inside C code, the interpreter must
144 144 simply wait unless the extension module releases the GIL.
145 145
146 146 4. There is no way, due to limitations in the Python threads library,
147 147 to kill a thread once it has started."""
148 148
149 149 if callable(func_or_exp):
150 150 kw = kwargs.get('kw',{})
151 151 job = BackgroundJobFunc(func_or_exp,*args,**kw)
152 152 elif isinstance(func_or_exp,basestring):
153 153 if not args:
154 154 frame = sys._getframe(1)
155 155 glob, loc = frame.f_globals, frame.f_locals
156 156 elif len(args)==1:
157 157 glob = loc = args[0]
158 158 elif len(args)==2:
159 159 glob,loc = args
160 160 else:
161 161 raise ValueError,\
162 162 'Expression jobs take at most 2 args (globals,locals)'
163 163 job = BackgroundJobExpr(func_or_exp,glob,loc)
164 164 else:
165 165 raise
166 166 jkeys = self.jobs_all.keys()
167 167 if jkeys:
168 168 job.num = max(jkeys)+1
169 169 else:
170 170 job.num = 0
171 171 self.jobs_run.append(job)
172 172 self.jobs_all[job.num] = job
173 173 print 'Starting job # %s in a separate thread.' % job.num
174 174 job.start()
175 175 return job
176 176
177 177 def __getitem__(self,key):
178 178 return self.jobs_all[key]
179 179
180 180 def __call__(self):
181 181 """An alias to self.status(),
182 182
183 183 This allows you to simply call a job manager instance much like the
184 184 Unix jobs shell command."""
185 185
186 186 return self.status()
187 187
188 188 def _update_status(self):
189 189 """Update the status of the job lists.
190 190
191 191 This method moves finished jobs to one of two lists:
192 192 - self.jobs_comp: jobs which completed successfully
193 193 - self.jobs_dead: jobs which finished but died.
194 194
195 195 It also copies those jobs to corresponding _report lists. These lists
196 196 are used to report jobs completed/dead since the last update, and are
197 197 then cleared by the reporting function after each call."""
198 198
199 199 run,comp,dead = self._s_running,self._s_completed,self._s_dead
200 200 jobs_run = self.jobs_run
201 201 for num in range(len(jobs_run)):
202 202 job = jobs_run[num]
203 203 stat = job.stat_code
204 204 if stat == run:
205 205 continue
206 206 elif stat == comp:
207 207 self.jobs_comp.append(job)
208 208 self._comp_report.append(job)
209 209 jobs_run[num] = False
210 210 elif stat == dead:
211 211 self.jobs_dead.append(job)
212 212 self._dead_report.append(job)
213 213 jobs_run[num] = False
214 214 self.jobs_run = filter(None,self.jobs_run)
215 215
216 216 def _group_report(self,group,name):
217 217 """Report summary for a given job group.
218 218
219 219 Return True if the group had any elements."""
220 220
221 221 if group:
222 222 print '%s jobs:' % name
223 223 for job in group:
224 224 print '%s : %s' % (job.num,job)
225 225 print
226 226 return True
227 227
228 228 def _group_flush(self,group,name):
229 229 """Flush a given job group
230 230
231 231 Return True if the group had any elements."""
232 232
233 233 njobs = len(group)
234 234 if njobs:
235 235 plural = {1:''}.setdefault(njobs,'s')
236 236 print 'Flushing %s %s job%s.' % (njobs,name,plural)
237 237 group[:] = []
238 238 return True
239 239
240 240 def _status_new(self):
241 241 """Print the status of newly finished jobs.
242 242
243 243 Return True if any new jobs are reported.
244 244
245 245 This call resets its own state every time, so it only reports jobs
246 246 which have finished since the last time it was called."""
247 247
248 248 self._update_status()
249 249 new_comp = self._group_report(self._comp_report,'Completed')
250 250 new_dead = self._group_report(self._dead_report,
251 'Dead, call job.traceback() for details')
251 'Dead, call jobs.traceback() for details')
252 252 self._comp_report[:] = []
253 253 self._dead_report[:] = []
254 254 return new_comp or new_dead
255 255
256 256 def status(self,verbose=0):
257 257 """Print a status of all jobs currently being managed."""
258 258
259 259 self._update_status()
260 260 self._group_report(self.jobs_run,'Running')
261 261 self._group_report(self.jobs_comp,'Completed')
262 262 self._group_report(self.jobs_dead,'Dead')
263 263 # Also flush the report queues
264 264 self._comp_report[:] = []
265 265 self._dead_report[:] = []
266 266
267 267 def remove(self,num):
268 268 """Remove a finished (completed or dead) job."""
269 269
270 270 try:
271 271 job = self.jobs_all[num]
272 272 except KeyError:
273 273 error('Job #%s not found' % num)
274 274 else:
275 275 stat_code = job.stat_code
276 276 if stat_code == self._s_running:
277 277 error('Job #%s is still running, it can not be removed.' % num)
278 278 return
279 279 elif stat_code == self._s_completed:
280 280 self.jobs_comp.remove(job)
281 281 elif stat_code == self._s_dead:
282 282 self.jobs_dead.remove(job)
283 283
284 284 def flush_finished(self):
285 285 """Flush all jobs finished (completed and dead) from lists.
286 286
287 287 Running jobs are never flushed.
288 288
289 289 It first calls _status_new(), to update info. If any jobs have
290 290 completed since the last _status_new() call, the flush operation
291 291 aborts."""
292 292
293 293 if self._status_new():
294 294 error('New jobs completed since last '\
295 295 '_status_new(), aborting flush.')
296 296 return
297 297
298 298 # Remove the finished jobs from the master dict
299 299 jobs_all = self.jobs_all
300 300 for job in self.jobs_comp+self.jobs_dead:
301 301 del(jobs_all[job.num])
302 302
303 303 # Now flush these lists completely
304 304 fl_comp = self._group_flush(self.jobs_comp,'Completed')
305 305 fl_dead = self._group_flush(self.jobs_dead,'Dead')
306 306 if not (fl_comp or fl_dead):
307 307 print 'No jobs to flush.'
308 308
309 309 def result(self,num):
310 310 """result(N) -> return the result of job N."""
311 311 try:
312 312 return self.jobs_all[num].result
313 313 except KeyError:
314 314 error('Job #%s not found' % num)
315 315
316 316 def traceback(self,num):
317 317 try:
318 318 self.jobs_all[num].traceback()
319 319 except KeyError:
320 320 error('Job #%s not found' % num)
321 321
322 322
323 323 class BackgroundJobBase(threading.Thread):
324 324 """Base class to build BackgroundJob classes.
325 325
326 326 The derived classes must implement:
327 327
328 328 - Their own __init__, since the one here raises NotImplementedError. The
329 329 derived constructor must call self._init() at the end, to provide common
330 330 initialization.
331 331
332 332 - A strform attribute used in calls to __str__.
333 333
334 334 - A call() method, which will make the actual execution call and must
335 335 return a value to be held in the 'result' field of the job object."""
336 336
337 337 # Class constants for status, in string and as numerical codes (when
338 338 # updating jobs lists, we don't want to do string comparisons). This will
339 339 # be done at every user prompt, so it has to be as fast as possible
340 340 stat_created = 'Created'; stat_created_c = 0
341 341 stat_running = 'Running'; stat_running_c = 1
342 342 stat_completed = 'Completed'; stat_completed_c = 2
343 stat_dead = 'Dead (Exception), call job.traceback() for details'
343 stat_dead = 'Dead (Exception), call jobs.traceback() for details'
344 344 stat_dead_c = -1
345 345
346 346 def __init__(self):
347 347 raise NotImplementedError, \
348 348 "This class can not be instantiated directly."
349 349
350 350 def _init(self):
351 351 """Common initialization for all BackgroundJob objects"""
352 352
353 353 for attr in ['call','strform']:
354 354 assert hasattr(self,attr), "Missing attribute <%s>" % attr
355 355
356 356 # The num tag can be set by an external job manager
357 357 self.num = None
358 358
359 359 self.status = BackgroundJobBase.stat_created
360 360 self.stat_code = BackgroundJobBase.stat_created_c
361 361 self.finished = False
362 362 self.result = '<BackgroundJob has not completed>'
363 363 # reuse the ipython traceback handler if we can get to it, otherwise
364 364 # make a new one
365 365 try:
366 366 self._make_tb = __IPYTHON__.InteractiveTB.text
367 367 except:
368 368 self._make_tb = AutoFormattedTB(mode = 'Context',
369 369 color_scheme='NoColor',
370 370 tb_offset = 1).text
371 371 # Hold a formatted traceback if one is generated.
372 372 self._tb = None
373 373
374 374 threading.Thread.__init__(self)
375 375
376 376 def __str__(self):
377 377 return self.strform
378 378
379 379 def __repr__(self):
380 380 return '<BackgroundJob: %s>' % self.strform
381 381
382 382 def traceback(self):
383 383 print self._tb
384 384
385 385 def run(self):
386 386 try:
387 387 self.status = BackgroundJobBase.stat_running
388 388 self.stat_code = BackgroundJobBase.stat_running_c
389 389 self.result = self.call()
390 390 except:
391 391 self.status = BackgroundJobBase.stat_dead
392 392 self.stat_code = BackgroundJobBase.stat_dead_c
393 393 self.finished = None
394 self.result = ('<BackgroundJob died, call job.traceback() for details>')
394 self.result = ('<BackgroundJob died, call jobs.traceback() for details>')
395 395 self._tb = self._make_tb()
396 396 else:
397 397 self.status = BackgroundJobBase.stat_completed
398 398 self.stat_code = BackgroundJobBase.stat_completed_c
399 399 self.finished = True
400 400
401 401 class BackgroundJobExpr(BackgroundJobBase):
402 402 """Evaluate an expression as a background job (uses a separate thread)."""
403 403
404 404 def __init__(self,expression,glob=None,loc=None):
405 405 """Create a new job from a string which can be fed to eval().
406 406
407 407 global/locals dicts can be provided, which will be passed to the eval
408 408 call."""
409 409
410 410 # fail immediately if the given expression can't be compiled
411 411 self.code = compile(expression,'<BackgroundJob compilation>','eval')
412 412
413 413 if glob is None:
414 414 glob = {}
415 415 if loc is None:
416 416 loc = {}
417 417
418 418 self.expression = self.strform = expression
419 419 self.glob = glob
420 420 self.loc = loc
421 421 self._init()
422 422
423 423 def call(self):
424 424 return eval(self.code,self.glob,self.loc)
425 425
426 426 class BackgroundJobFunc(BackgroundJobBase):
427 427 """Run a function call as a background job (uses a separate thread)."""
428 428
429 429 def __init__(self,func,*args,**kwargs):
430 430 """Create a new job from a callable object.
431 431
432 432 Any positional arguments and keyword args given to this constructor
433 433 after the initial callable are passed directly to it."""
434 434
435 435 assert callable(func),'first argument must be callable'
436 436
437 437 if args is None:
438 438 args = []
439 439 if kwargs is None:
440 440 kwargs = {}
441 441
442 442 self.func = func
443 443 self.args = args
444 444 self.kwargs = kwargs
445 445 # The string form will only include the function passed, because
446 446 # generating string representations of the arguments is a potentially
447 447 # _very_ expensive operation (e.g. with large arrays).
448 448 self.strform = str(func)
449 449 self._init()
450 450
451 451 def call(self):
452 452 return self.func(*self.args,**self.kwargs)
453 453
454 454
455 455 if __name__=='__main__':
456 456
457 457 import time
458 458
459 459 def sleepfunc(interval=2,*a,**kw):
460 460 args = dict(interval=interval,
461 461 args=a,
462 462 kwargs=kw)
463 463 time.sleep(interval)
464 464 return args
465 465
466 466 def diefunc(interval=2,*a,**kw):
467 467 time.sleep(interval)
468 468 die
469 469
470 470 def printfunc(interval=1,reps=5):
471 471 for n in range(reps):
472 472 time.sleep(interval)
473 473 print 'In the background...'
474 474
475 475 jobs = BackgroundJobManager()
476 476 # first job will have # 0
477 477 jobs.new(sleepfunc,4)
478 478 jobs.new(sleepfunc,kw={'reps':2})
479 479 # This makes a job which will die
480 480 jobs.new(diefunc,1)
481 481 jobs.new('printfunc(1,3)')
482 482
483 483 # after a while, you can get the traceback of a dead job. Run the line
484 484 # below again interactively until it prints a traceback (check the status
485 485 # of the job):
486 486 print jobs[1].status
487 487 jobs[1].traceback()
488 488
489 489 # Run this line again until the printed result changes
490 490 print "The result of job #0 is:",jobs[0].result
General Comments 0
You need to be logged in to leave comments. Login now