##// END OF EJS Templates
Merge pull request #10825 from rs2/master...
Thomas Kluyver -
r24025:5f1d8312 merge
parent child Browse files
Show More
@@ -1,489 +1,489
1 1 # -*- coding: utf-8 -*-
2 2 """Manage background (threaded) jobs conveniently from an interactive shell.
3 3
4 4 This module provides a BackgroundJobManager class. This is the main class
5 5 meant for public usage, it implements an object which can create and manage
6 6 new background jobs.
7 7
8 8 It also provides the actual job classes managed by these BackgroundJobManager
9 9 objects, see their docstrings below.
10 10
11 11
12 12 This system was inspired by discussions with B. Granger and the
13 13 BackgroundCommand class described in the book Python Scripting for
14 14 Computational Science, by H. P. Langtangen:
15 15
16 16 http://folk.uio.no/hpl/scripting
17 17
18 18 (although ultimately no code from this text was used, as IPython's system is a
19 19 separate implementation).
20 20
21 21 An example notebook is provided in our documentation illustrating interactive
22 22 use of the system.
23 23 """
24 24
25 25 #*****************************************************************************
26 26 # Copyright (C) 2005-2006 Fernando Perez <fperez@colorado.edu>
27 27 #
28 28 # Distributed under the terms of the BSD License. The full license is in
29 29 # the file COPYING, distributed as part of this software.
30 30 #*****************************************************************************
31 31
32 32 # Code begins
33 33 import sys
34 34 import threading
35 35
36 36 from IPython import get_ipython
37 37 from IPython.core.ultratb import AutoFormattedTB
38 from logging import error
38 from logging import error, debug
39 39
40 40
41 41 class BackgroundJobManager(object):
42 42 """Class to manage a pool of backgrounded threaded jobs.
43 43
44 44 Below, we assume that 'jobs' is a BackgroundJobManager instance.
45 45
46 46 Usage summary (see the method docstrings for details):
47 47
48 48 jobs.new(...) -> start a new job
49 49
50 50 jobs() or jobs.status() -> print status summary of all jobs
51 51
52 52 jobs[N] -> returns job number N.
53 53
54 54 foo = jobs[N].result -> assign to variable foo the result of job N
55 55
56 56 jobs[N].traceback() -> print the traceback of dead job N
57 57
58 58 jobs.remove(N) -> remove (finished) job N
59 59
60 60 jobs.flush() -> remove all finished jobs
61 61
62 62 As a convenience feature, BackgroundJobManager instances provide the
63 63 utility result and traceback methods which retrieve the corresponding
64 64 information from the jobs list:
65 65
66 66 jobs.result(N) <--> jobs[N].result
67 67 jobs.traceback(N) <--> jobs[N].traceback()
68 68
69 69 While this appears minor, it allows you to use tab completion
70 70 interactively on the job manager instance.
71 71 """
72 72
73 73 def __init__(self):
74 74 # Lists for job management, accessed via a property to ensure they're
75 75 # up to date.x
76 76 self._running = []
77 77 self._completed = []
78 78 self._dead = []
79 79 # A dict of all jobs, so users can easily access any of them
80 80 self.all = {}
81 81 # For reporting
82 82 self._comp_report = []
83 83 self._dead_report = []
84 84 # Store status codes locally for fast lookups
85 85 self._s_created = BackgroundJobBase.stat_created_c
86 86 self._s_running = BackgroundJobBase.stat_running_c
87 87 self._s_completed = BackgroundJobBase.stat_completed_c
88 88 self._s_dead = BackgroundJobBase.stat_dead_c
89 89
90 90 @property
91 91 def running(self):
92 92 self._update_status()
93 93 return self._running
94 94
95 95 @property
96 96 def dead(self):
97 97 self._update_status()
98 98 return self._dead
99 99
100 100 @property
101 101 def completed(self):
102 102 self._update_status()
103 103 return self._completed
104 104
105 105 def new(self, func_or_exp, *args, **kwargs):
106 106 """Add a new background job and start it in a separate thread.
107 107
108 108 There are two types of jobs which can be created:
109 109
110 110 1. Jobs based on expressions which can be passed to an eval() call.
111 111 The expression must be given as a string. For example:
112 112
113 113 job_manager.new('myfunc(x,y,z=1)'[,glob[,loc]])
114 114
115 115 The given expression is passed to eval(), along with the optional
116 116 global/local dicts provided. If no dicts are given, they are
117 117 extracted automatically from the caller's frame.
118 118
119 119 A Python statement is NOT a valid eval() expression. Basically, you
120 120 can only use as an eval() argument something which can go on the right
121 121 of an '=' sign and be assigned to a variable.
122 122
123 123 For example,"print 'hello'" is not valid, but '2+3' is.
124 124
125 125 2. Jobs given a function object, optionally passing additional
126 126 positional arguments:
127 127
128 128 job_manager.new(myfunc, x, y)
129 129
130 130 The function is called with the given arguments.
131 131
132 132 If you need to pass keyword arguments to your function, you must
133 133 supply them as a dict named kw:
134 134
135 135 job_manager.new(myfunc, x, y, kw=dict(z=1))
136 136
137 137 The reason for this assymmetry is that the new() method needs to
138 138 maintain access to its own keywords, and this prevents name collisions
139 139 between arguments to new() and arguments to your own functions.
140 140
141 141 In both cases, the result is stored in the job.result field of the
142 142 background job object.
143 143
144 144 You can set `daemon` attribute of the thread by giving the keyword
145 145 argument `daemon`.
146 146
147 147 Notes and caveats:
148 148
149 149 1. All threads running share the same standard output. Thus, if your
150 150 background jobs generate output, it will come out on top of whatever
151 151 you are currently writing. For this reason, background jobs are best
152 152 used with silent functions which simply return their output.
153 153
154 154 2. Threads also all work within the same global namespace, and this
155 155 system does not lock interactive variables. So if you send job to the
156 156 background which operates on a mutable object for a long time, and
157 157 start modifying that same mutable object interactively (or in another
158 158 backgrounded job), all sorts of bizarre behaviour will occur.
159 159
160 160 3. If a background job is spending a lot of time inside a C extension
161 161 module which does not release the Python Global Interpreter Lock
162 162 (GIL), this will block the IPython prompt. This is simply because the
163 163 Python interpreter can only switch between threads at Python
164 164 bytecodes. While the execution is inside C code, the interpreter must
165 165 simply wait unless the extension module releases the GIL.
166 166
167 167 4. There is no way, due to limitations in the Python threads library,
168 168 to kill a thread once it has started."""
169 169
170 170 if callable(func_or_exp):
171 171 kw = kwargs.get('kw',{})
172 172 job = BackgroundJobFunc(func_or_exp,*args,**kw)
173 173 elif isinstance(func_or_exp, str):
174 174 if not args:
175 175 frame = sys._getframe(1)
176 176 glob, loc = frame.f_globals, frame.f_locals
177 177 elif len(args)==1:
178 178 glob = loc = args[0]
179 179 elif len(args)==2:
180 180 glob,loc = args
181 181 else:
182 182 raise ValueError(
183 183 'Expression jobs take at most 2 args (globals,locals)')
184 184 job = BackgroundJobExpr(func_or_exp, glob, loc)
185 185 else:
186 186 raise TypeError('invalid args for new job')
187 187
188 188 if kwargs.get('daemon', False):
189 189 job.daemon = True
190 190 job.num = len(self.all)+1 if self.all else 0
191 191 self.running.append(job)
192 192 self.all[job.num] = job
193 print('Starting job # %s in a separate thread.' % job.num)
193 debug('Starting job # %s in a separate thread.' % job.num)
194 194 job.start()
195 195 return job
196 196
197 197 def __getitem__(self, job_key):
198 198 num = job_key if isinstance(job_key, int) else job_key.num
199 199 return self.all[num]
200 200
201 201 def __call__(self):
202 202 """An alias to self.status(),
203 203
204 204 This allows you to simply call a job manager instance much like the
205 205 Unix `jobs` shell command."""
206 206
207 207 return self.status()
208 208
209 209 def _update_status(self):
210 210 """Update the status of the job lists.
211 211
212 212 This method moves finished jobs to one of two lists:
213 213 - self.completed: jobs which completed successfully
214 214 - self.dead: jobs which finished but died.
215 215
216 216 It also copies those jobs to corresponding _report lists. These lists
217 217 are used to report jobs completed/dead since the last update, and are
218 218 then cleared by the reporting function after each call."""
219 219
220 220 # Status codes
221 221 srun, scomp, sdead = self._s_running, self._s_completed, self._s_dead
222 222 # State lists, use the actual lists b/c the public names are properties
223 223 # that call this very function on access
224 224 running, completed, dead = self._running, self._completed, self._dead
225 225
226 226 # Now, update all state lists
227 227 for num, job in enumerate(running):
228 228 stat = job.stat_code
229 229 if stat == srun:
230 230 continue
231 231 elif stat == scomp:
232 232 completed.append(job)
233 233 self._comp_report.append(job)
234 234 running[num] = False
235 235 elif stat == sdead:
236 236 dead.append(job)
237 237 self._dead_report.append(job)
238 238 running[num] = False
239 239 # Remove dead/completed jobs from running list
240 240 running[:] = filter(None, running)
241 241
242 242 def _group_report(self,group,name):
243 243 """Report summary for a given job group.
244 244
245 245 Return True if the group had any elements."""
246 246
247 247 if group:
248 248 print('%s jobs:' % name)
249 249 for job in group:
250 250 print('%s : %s' % (job.num,job))
251 251 print()
252 252 return True
253 253
254 254 def _group_flush(self,group,name):
255 255 """Flush a given job group
256 256
257 257 Return True if the group had any elements."""
258 258
259 259 njobs = len(group)
260 260 if njobs:
261 261 plural = {1:''}.setdefault(njobs,'s')
262 262 print('Flushing %s %s job%s.' % (njobs,name,plural))
263 263 group[:] = []
264 264 return True
265 265
266 266 def _status_new(self):
267 267 """Print the status of newly finished jobs.
268 268
269 269 Return True if any new jobs are reported.
270 270
271 271 This call resets its own state every time, so it only reports jobs
272 272 which have finished since the last time it was called."""
273 273
274 274 self._update_status()
275 275 new_comp = self._group_report(self._comp_report, 'Completed')
276 276 new_dead = self._group_report(self._dead_report,
277 277 'Dead, call jobs.traceback() for details')
278 278 self._comp_report[:] = []
279 279 self._dead_report[:] = []
280 280 return new_comp or new_dead
281 281
282 282 def status(self,verbose=0):
283 283 """Print a status of all jobs currently being managed."""
284 284
285 285 self._update_status()
286 286 self._group_report(self.running,'Running')
287 287 self._group_report(self.completed,'Completed')
288 288 self._group_report(self.dead,'Dead')
289 289 # Also flush the report queues
290 290 self._comp_report[:] = []
291 291 self._dead_report[:] = []
292 292
293 293 def remove(self,num):
294 294 """Remove a finished (completed or dead) job."""
295 295
296 296 try:
297 297 job = self.all[num]
298 298 except KeyError:
299 299 error('Job #%s not found' % num)
300 300 else:
301 301 stat_code = job.stat_code
302 302 if stat_code == self._s_running:
303 303 error('Job #%s is still running, it can not be removed.' % num)
304 304 return
305 305 elif stat_code == self._s_completed:
306 306 self.completed.remove(job)
307 307 elif stat_code == self._s_dead:
308 308 self.dead.remove(job)
309 309
310 310 def flush(self):
311 311 """Flush all finished jobs (completed and dead) from lists.
312 312
313 313 Running jobs are never flushed.
314 314
315 315 It first calls _status_new(), to update info. If any jobs have
316 316 completed since the last _status_new() call, the flush operation
317 317 aborts."""
318 318
319 319 # Remove the finished jobs from the master dict
320 320 alljobs = self.all
321 321 for job in self.completed+self.dead:
322 322 del(alljobs[job.num])
323 323
324 324 # Now flush these lists completely
325 325 fl_comp = self._group_flush(self.completed, 'Completed')
326 326 fl_dead = self._group_flush(self.dead, 'Dead')
327 327 if not (fl_comp or fl_dead):
328 328 print('No jobs to flush.')
329 329
330 330 def result(self,num):
331 331 """result(N) -> return the result of job N."""
332 332 try:
333 333 return self.all[num].result
334 334 except KeyError:
335 335 error('Job #%s not found' % num)
336 336
337 337 def _traceback(self, job):
338 338 num = job if isinstance(job, int) else job.num
339 339 try:
340 340 self.all[num].traceback()
341 341 except KeyError:
342 342 error('Job #%s not found' % num)
343 343
344 344 def traceback(self, job=None):
345 345 if job is None:
346 346 self._update_status()
347 347 for deadjob in self.dead:
348 348 print("Traceback for: %r" % deadjob)
349 349 self._traceback(deadjob)
350 350 print()
351 351 else:
352 352 self._traceback(job)
353 353
354 354
355 355 class BackgroundJobBase(threading.Thread):
356 356 """Base class to build BackgroundJob classes.
357 357
358 358 The derived classes must implement:
359 359
360 360 - Their own __init__, since the one here raises NotImplementedError. The
361 361 derived constructor must call self._init() at the end, to provide common
362 362 initialization.
363 363
364 364 - A strform attribute used in calls to __str__.
365 365
366 366 - A call() method, which will make the actual execution call and must
367 367 return a value to be held in the 'result' field of the job object.
368 368 """
369 369
370 370 # Class constants for status, in string and as numerical codes (when
371 371 # updating jobs lists, we don't want to do string comparisons). This will
372 372 # be done at every user prompt, so it has to be as fast as possible
373 373 stat_created = 'Created'; stat_created_c = 0
374 374 stat_running = 'Running'; stat_running_c = 1
375 375 stat_completed = 'Completed'; stat_completed_c = 2
376 376 stat_dead = 'Dead (Exception), call jobs.traceback() for details'
377 377 stat_dead_c = -1
378 378
379 379 def __init__(self):
380 380 """Must be implemented in subclasses.
381 381
382 382 Subclasses must call :meth:`_init` for standard initialisation.
383 383 """
384 384 raise NotImplementedError("This class can not be instantiated directly.")
385 385
386 386 def _init(self):
387 387 """Common initialization for all BackgroundJob objects"""
388 388
389 389 for attr in ['call','strform']:
390 390 assert hasattr(self,attr), "Missing attribute <%s>" % attr
391 391
392 392 # The num tag can be set by an external job manager
393 393 self.num = None
394 394
395 395 self.status = BackgroundJobBase.stat_created
396 396 self.stat_code = BackgroundJobBase.stat_created_c
397 397 self.finished = False
398 398 self.result = '<BackgroundJob has not completed>'
399 399
400 400 # reuse the ipython traceback handler if we can get to it, otherwise
401 401 # make a new one
402 402 try:
403 403 make_tb = get_ipython().InteractiveTB.text
404 404 except:
405 405 make_tb = AutoFormattedTB(mode = 'Context',
406 406 color_scheme='NoColor',
407 407 tb_offset = 1).text
408 408 # Note that the actual API for text() requires the three args to be
409 409 # passed in, so we wrap it in a simple lambda.
410 410 self._make_tb = lambda : make_tb(None, None, None)
411 411
412 412 # Hold a formatted traceback if one is generated.
413 413 self._tb = None
414 414
415 415 threading.Thread.__init__(self)
416 416
417 417 def __str__(self):
418 418 return self.strform
419 419
420 420 def __repr__(self):
421 421 return '<BackgroundJob #%d: %s>' % (self.num, self.strform)
422 422
423 423 def traceback(self):
424 424 print(self._tb)
425 425
426 426 def run(self):
427 427 try:
428 428 self.status = BackgroundJobBase.stat_running
429 429 self.stat_code = BackgroundJobBase.stat_running_c
430 430 self.result = self.call()
431 431 except:
432 432 self.status = BackgroundJobBase.stat_dead
433 433 self.stat_code = BackgroundJobBase.stat_dead_c
434 434 self.finished = None
435 435 self.result = ('<BackgroundJob died, call jobs.traceback() for details>')
436 436 self._tb = self._make_tb()
437 437 else:
438 438 self.status = BackgroundJobBase.stat_completed
439 439 self.stat_code = BackgroundJobBase.stat_completed_c
440 440 self.finished = True
441 441
442 442
443 443 class BackgroundJobExpr(BackgroundJobBase):
444 444 """Evaluate an expression as a background job (uses a separate thread)."""
445 445
446 446 def __init__(self, expression, glob=None, loc=None):
447 447 """Create a new job from a string which can be fed to eval().
448 448
449 449 global/locals dicts can be provided, which will be passed to the eval
450 450 call."""
451 451
452 452 # fail immediately if the given expression can't be compiled
453 453 self.code = compile(expression,'<BackgroundJob compilation>','eval')
454 454
455 455 glob = {} if glob is None else glob
456 456 loc = {} if loc is None else loc
457 457 self.expression = self.strform = expression
458 458 self.glob = glob
459 459 self.loc = loc
460 460 self._init()
461 461
462 462 def call(self):
463 463 return eval(self.code,self.glob,self.loc)
464 464
465 465
466 466 class BackgroundJobFunc(BackgroundJobBase):
467 467 """Run a function call as a background job (uses a separate thread)."""
468 468
469 469 def __init__(self, func, *args, **kwargs):
470 470 """Create a new job from a callable object.
471 471
472 472 Any positional arguments and keyword args given to this constructor
473 473 after the initial callable are passed directly to it."""
474 474
475 475 if not callable(func):
476 476 raise TypeError(
477 477 'first argument to BackgroundJobFunc must be callable')
478 478
479 479 self.func = func
480 480 self.args = args
481 481 self.kwargs = kwargs
482 482 # The string form will only include the function passed, because
483 483 # generating string representations of the arguments is a potentially
484 484 # _very_ expensive operation (e.g. with large arrays).
485 485 self.strform = str(func)
486 486 self._init()
487 487
488 488 def call(self):
489 489 return self.func(*self.args, **self.kwargs)
General Comments 0
You need to be logged in to leave comments. Login now